refactor(sync): refactor sync

This commit is contained in:
Minghao Li 2022-10-20 19:42:22 +08:00
commit cc7a393978
50 changed files with 2725 additions and 2343 deletions

View File

@ -108,11 +108,11 @@ INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) VALUES ('202
## Insert Rows From A File
Besides using `VALUES` to insert one or multiple rows, the data to be inserted can also be prepared in a CSV file with comma as separator and each field value quoted by single quotes. Table definition is not required in the CSV file. For example, if file "/tmp/csvfile.csv" contains the below data:
Besides using `VALUES` to insert one or multiple rows, the data to be inserted can also be prepared in a CSV file with comma as separator and timestamp and string field value quoted by single quotes. Table definition is not required in the CSV file. For example, if file "/tmp/csvfile.csv" contains the below data:
```
'2021-07-13 14:07:34.630', '10.2', '219', '0.32'
'2021-07-13 14:07:35.779', '10.15', '217', '0.33'
'2021-07-13 14:07:34.630', 10.2, 219, 0.32
'2021-07-13 14:07:35.779', 10.15, 217, 0.33
```
Then data in this file can be inserted by the SQL statement below:

View File

@ -64,6 +64,12 @@ dnode_option: {
The parameters that you can modify through this statement are the same as those located in the dnode configuration file. Modifications that you make through this statement take effect immediately, while modifications to the configuration file take effect when the dnode restarts.
`value` is the value of the parameter, which needs to be in character format. For example, modify the log output level of dnode 1 to debug:
```sql
ALTER DNODE 1 'debugFlag' '143';
```
## Add an Mnode
```sql

View File

@ -112,6 +112,9 @@ taosBenchmark -f <json file>
- **-u/--user <user\>** :
User name to connect to the TDengine server. Default is root.
- **-U/--supplement-insert ** :
Supplementally insert data without create database and table, optional, default is off.
- **-p/--password <passwd\>** :
The default password to connect to the TDengine server is `taosdata`.
@ -149,7 +152,6 @@ taosBenchmark -f <json file>
specify the number of columns in the super table. If both this parameter and `-b/--data-type` is set, the final result number of columns is the greater of the two. If the number specified by this parameter is greater than the number of columns specified by `-b/--data-type`, the unspecified column type defaults to INT, for example: `-l 5 -b float,double`, then the final column is `FLOAT,DOUBLE,INT,INT,INT`. If the number of columns specified is less than or equal to the number of columns specified by `-b/--data-type`, then the result is the column and type specified by `-b/--data-type`, e.g.: `-l 3 -b float,double,float,bigint`. The last column is `FLOAT,DOUBLE, FLOAT,BIGINT`.
- **-L/--partial-col-num <colNum\> ** :
Specify first numbers of columns has data. Rest of columns' data are NULL. Default is all columns have data.
- **-A/--tag-type <tagType\>** :

View File

@ -109,11 +109,11 @@ INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) VALUES ('202
## 插入来自文件的数据记录
除了使用 VALUES 关键字插入一行或多行数据外,也可以把要写入的数据放在 CSV 文件中(英文逗号分隔、英文单引号括住每个值)供 SQL 指令读取。其中 CSV 文件无需表头。例如,如果 /tmp/csvfile.csv 文件的内容为:
除了使用 VALUES 关键字插入一行或多行数据外,也可以把要写入的数据放在 CSV 文件中(英文逗号分隔、时间戳和字符串类型的值需要用英文单引号括住)供 SQL 指令读取。其中 CSV 文件无需表头。例如,如果 /tmp/csvfile.csv 文件的内容为:
```
'2021-07-13 14:07:34.630', '10.2', '219', '0.32'
'2021-07-13 14:07:35.779', '10.15', '217', '0.33'
'2021-07-13 14:07:34.630', 10.2, 219, 0.32
'2021-07-13 14:07:35.779', 10.15, 217, 0.33
```
那么通过如下指令可以把这个文件中的数据写入子表中:

View File

@ -65,6 +65,12 @@ dnode_option: {
上面语法中的这些可修改配置项其配置方式与 dnode 配置文件中的配置方式相同,区别是修改是动态的立即生效,且不需要重启 dnode。
value 是参数的值,需要是字符格式。如修改 dnode 1 的日志输出级别为 debug
```sql
ALTER DNODE 1 'debugFlag' '143';
```
## 添加管理节点
```sql

View File

@ -112,6 +112,9 @@ taosBenchmark -f <json file>
- **-u/--user <user\>** :
用于连接 TDengine 服务端的用户名,默认为 root 。
- **-U/--supplement-insert ** :
写入数据而不提前建数据库和表,默认关闭。
- **-p/--password <passwd\>** :
用于连接 TDengine 服务端的密码,默认值为 taosdata。

View File

@ -83,6 +83,27 @@ typedef struct {
int32_t exprIdx;
} STupleKey;
typedef struct STuplePos {
union {
struct {
int32_t pageId;
int32_t offset;
};
STupleKey streamTupleKey;
};
} STuplePos;
typedef struct SFirstLastRes {
bool hasResult;
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
// this attribute is required
bool isNull;
int32_t bytes;
int64_t ts;
STuplePos pos;
char buf[];
} SFirstLastRes;
static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
STupleKey* pTuple1 = (STupleKey*)pKey1;
STupleKey* pTuple2 = (STupleKey*)pKey2;

View File

@ -1191,7 +1191,6 @@ typedef struct {
int8_t strict;
int8_t cacheLast;
int8_t isTsma;
int8_t standby;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
@ -1206,6 +1205,7 @@ typedef struct {
int16_t hashPrefix;
int16_t hashSuffix;
int32_t tsdbPageSize;
int64_t reserved[8];
} SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
@ -1217,6 +1217,7 @@ typedef struct {
int32_t dnodeId;
int64_t dbUid;
char db[TSDB_DB_FNAME_LEN];
int64_t reserved[8];
} SDropVnodeReq;
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
@ -1225,6 +1226,7 @@ int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq
typedef struct {
int64_t dbUid;
char db[TSDB_DB_FNAME_LEN];
int64_t reserved[8];
} SCompactVnodeReq;
int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
@ -1244,13 +1246,23 @@ typedef struct {
int8_t walLevel;
int8_t strict;
int8_t cacheLast;
int64_t reserved[8];
} SAlterVnodeConfigReq;
int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq);
int32_t tDeserializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq);
typedef struct {
int32_t vgId;
int8_t strict;
int8_t selfIndex;
int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA];
} SAlterVnodeReq;
int64_t reserved[8];
} SAlterVnodeReplicaReq;
int32_t tSerializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq);
int32_t tDeserializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq);
int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
typedef struct {
SMsgHead header;
@ -1503,14 +1515,6 @@ typedef struct {
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
typedef struct {
int32_t dnodeId;
int8_t standby;
} SSetStandbyReq;
int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
int32_t tDeserializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
typedef struct {
char queryStrId[TSDB_QUERY_ID_LEN];
} SKillQueryReq;

View File

@ -262,8 +262,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)

View File

@ -129,6 +129,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_TO_COLUMN,
FUNCTION_TYPE_GROUP_KEY,
FUNCTION_TYPE_CACHE_LAST_ROW,
FUNCTION_TYPE_CACHE_LAST,
// distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
@ -216,6 +217,8 @@ bool fmIsKeepOrderFunc(int32_t funcId);
bool fmIsCumulativeFunc(int32_t funcId);
bool fmIsInterpPseudoColumnFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
typedef enum EFuncDataRequired {

View File

@ -979,6 +979,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
void* buf = NULL;
SMsgSendInfo* sendInfo = NULL;
SCMSubscribeReq req = {0};
int32_t code = -1;
@ -1016,7 +1017,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
void* abuf = buf;
tSerializeSCMSubscribeReq(&abuf, &req);
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) goto FAIL;
SMqSubscribeCbParam param = {
@ -1046,6 +1047,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
// avoid double free if msg is sent
buf = NULL;
sendInfo = NULL;
tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem);
@ -1078,8 +1080,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code = 0;
FAIL:
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
taosArrayDestroyP(req.topicNames, taosMemoryFree);
taosMemoryFree(buf);
taosMemoryFree(sendInfo);
return code;
}
@ -1613,7 +1616,11 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (rspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper == NULL) return NULL;
if (rspWrapper == NULL) {
tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);
return NULL;
}
}
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
@ -1711,6 +1718,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj;
int64_t startTime = taosGetTimestampMs();
tscDebug("consumer:%" PRId64 ", start poll at %" PRId64, tmq->consumerId, startTime);
#if 0
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, timeout);
@ -1745,15 +1754,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
tscDebug("consumer:%" PRId64 ", return rsp", tmq->consumerId);
return (TAOS_RES*)rspObj;
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId);
return NULL;
}
if (timeout != -1) {
int64_t endTime = taosGetTimestampMs();
int64_t leftTime = endTime - startTime;
if (leftTime > timeout) {
tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", end time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, endTime);
return NULL;
}
tsem_timewait(&tmq->rspSem, leftTime * 1000);

View File

@ -21,7 +21,8 @@
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
pEp->port = 0;
strcpy(pEp->fqdn, ep);
memset(pEp->fqdn, 0, TSDB_FQDN_LEN);
strncpy(pEp->fqdn, ep, TSDB_FQDN_LEN - 1);
char* temp = strchr(pEp->fqdn, ':');
if (temp) {

View File

@ -3766,7 +3766,6 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
@ -3795,6 +3794,9 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI16(&encoder, pReq->hashPrefix) < 0) return -1;
if (tEncodeI16(&encoder, pReq->hashSuffix) < 0) return -1;
if (tEncodeI32(&encoder, pReq->tsdbPageSize) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder);
@ -3832,7 +3834,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
@ -3871,6 +3872,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI16(&decoder, &pReq->hashPrefix) < 0) return -1;
if (tDecodeI16(&decoder, &pReq->hashSuffix) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->tsdbPageSize) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -3892,6 +3896,9 @@ int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq)
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -3908,6 +3915,9 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -3921,6 +3931,9 @@ int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -3935,13 +3948,16 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeConfigReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -3959,11 +3975,8 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder);
@ -3972,7 +3985,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
return tlen;
}
int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeConfigReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -3990,12 +4003,54 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeReplicaReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
}
for (int32_t i = 0; i < 8; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeReplicaReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
}
for (int32_t i = 0; i < 8; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -4218,33 +4273,6 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq
return 0;
}
int32_t tSerializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);

View File

@ -90,8 +90,10 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) {
pName->type = TSDB_TABLE_NAME_T;
pName->acctId = acctId;
strcpy(pName->dbname, pDbName);
strcpy(pName->tname, pTableName);
memset(pName->dbname, 0, TSDB_DB_NAME_LEN);
strncpy(pName->dbname, pDbName, TSDB_DB_NAME_LEN - 1);
memset(pName->tname, 0, TSDB_TABLE_NAME_LEN);
strncpy(pName->tname, pTableName, TSDB_TABLE_NAME_LEN - 1);
return pName;
}

View File

@ -196,10 +196,6 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY, mmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;

View File

@ -88,6 +88,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
// vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);

View File

@ -265,6 +265,45 @@ _OVER:
return code;
}
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SAlterVnodeReplicaReq alterReq = {0};
if (tSerializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t vgId = alterReq.vgId;
dInfo("vgId:%d, start to alter vnode replica", alterReq.vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
return -1;
}
dInfo("vgId:%d, start to close vnode", vgId);
vmCloseVnode(pMgmt, pVnode);
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
if (vnodeAlter(path, &alterReq, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
return -1;
}
dInfo("vgId:%d, start to open vnode", vgId);
if (vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb) < 0) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;
}
dInfo("vgId:%d, vnode config is altered", vgId);
return 0;
}
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
@ -348,7 +387,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -370,7 +409,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;

View File

@ -40,6 +40,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg);
break;
case TDMT_VND_ALTER_REPLICA:
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);

View File

@ -70,7 +70,7 @@ TEST_F(DndTestVnode, 01_Create_Vnode) {
TEST_F(DndTestVnode, 02_Alter_Vnode) {
for (int i = 0; i < 3; ++i) {
SAlterVnodeReq alterReq = {0};
SAlterVnodeConfigReq alterReq = {0};
alterReq.vgVersion = 2;
alterReq.daysPerFile = 10;
alterReq.daysToKeep0 = 3650;

View File

@ -38,7 +38,7 @@ int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
int32_t mndAddVnodeToVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray);
int32_t mndRemoveVnodeFromVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid);
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby);
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
@ -47,9 +47,8 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnode
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
SArray *pArray);
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid);
#ifdef __cplusplus

View File

@ -447,7 +447,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false) != 0) {
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid) != 0) {
return -1;
}
}

View File

@ -56,8 +56,6 @@ int32_t mndInitMnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);

View File

@ -454,7 +454,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
pVgroup->pTsma = pSmaReq;
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, false);
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
taosMemoryFreeClear(pSmaReq);
if (pReq == NULL) return -1;

View File

@ -196,8 +196,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease(pSdb, pVgroup);
}
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen,
bool standby) {
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SCreateVnodeReq createReq = {0};
createReq.vgId = pVgroup->vgId;
memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
@ -227,7 +226,6 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.hashMethod = pDb->cfg.hashMethod;
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
createReq.pRetensions = pDb->cfg.pRetensions;
createReq.standby = standby;
createReq.isTsma = pVgroup->isTsma;
createReq.pTsma = pVgroup->pTsma;
createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
@ -279,8 +277,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
return pReq;
}
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SAlterVnodeReq alterReq = {0};
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SAlterVnodeConfigReq alterReq = {0};
alterReq.vgVersion = pVgroup->version;
alterReq.buffer = pDb->cfg.buffer;
alterReq.pageSize = pDb->cfg.pageSize;
@ -294,6 +292,34 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
alterReq.walLevel = pDb->cfg.walLevel;
alterReq.strict = pDb->cfg.strict;
alterReq.cacheLast = pDb->cfg.cacheLast;
int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
contLen += sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq);
*pContLen = contLen;
return pReq;
}
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup,
int32_t *pContLen) {
SAlterVnodeReplicaReq alterReq = {0};
alterReq.vgId = pVgroup->vgId;
alterReq.strict = pDb->cfg.strict;
alterReq.replica = pVgroup->replica;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
@ -308,14 +334,22 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
pReplica->port = pVgidDnode->port;
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
mndReleaseDnode(pMnode, pVgidDnode);
if (pDnode->id == pVgid->dnodeId) {
alterReq.selfIndex = v;
}
}
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
if (alterReq.selfIndex == -1) {
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
contLen += sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
@ -323,38 +357,7 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
return NULL;
}
SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq);
*pContLen = contLen;
return pReq;
}
void *mndBuildSetVnodeStandbyReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SSetStandbyReq standbyReq = {0};
standbyReq.dnodeId = pDnode->id;
standbyReq.standby = 1;
int32_t contLen = tSerializeSSetStandbyReq(NULL, 0, &standbyReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
contLen += sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSSetStandbyReq((char *)pReq + sizeof(SMsgHead), contLen, &standbyReq);
SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq);
*pContLen = contLen;
return pReq;
}
@ -948,8 +951,7 @@ _OVER:
return 0;
}
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
bool standby) {
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
@ -958,7 +960,7 @@ int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, standby);
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
@ -1007,7 +1009,7 @@ int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgO
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeReq(pMnode, pDb, pVgroup, &contLen);
void *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
@ -1022,41 +1024,6 @@ int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgO
return 0;
}
static int32_t mndAddSetVnodeStandByAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
SVnodeGid *pVgid, bool isRedo) {
STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildSetVnodeStandbyReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_SYNC_SET_VNODE_STANDBY;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
// Keep retrying until the target vnode is not the leader
action.retryCode = TSDB_CODE_SYN_IS_LEADER;
if (isRedo) {
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
} else {
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
}
return 0;
}
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
bool isRedo) {
STransAction action = {0};
@ -1102,7 +1069,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1], true) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
@ -1111,7 +1078,6 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
@ -1185,7 +1151,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
pGid->dnodeId = newDnodeId;
pGid->syncState = TAOS_SYNC_STATE_ERROR;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid, true) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
@ -1212,7 +1178,6 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
@ -1580,36 +1545,32 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
mndTransSetSerial(pTrans);
if (newVgroup.replica < pNewDb->cfg.replications) {
mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId,
if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
} else if (newVgroup.replica > pNewDb->cfg.replications) {
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
}
if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
SVnodeGid del1 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
SVnodeGid del2 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
} else {
return -1;
}
{
@ -1660,13 +1621,12 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
if (newVg1.replica == 1) {
if (mndAddVnodeToVgroup(pMnode, &newVg1, pArray) != 0) goto _OVER;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1], true) != 0) goto _OVER;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]) != 0) goto _OVER;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
} else if (newVg1.replica == 3) {
SVnodeGid del1 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVg1, pArray, &del1) != 0) goto _OVER;
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del1, true) != 0) goto _OVER;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;

View File

@ -50,6 +50,7 @@ extern const SVnodeCfg vnodeCfgDefault;
int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode);

View File

@ -299,6 +299,11 @@ int32_t tsdbMerge(STsdb *pTsdb);
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
// tsdbCache ==============================================================================================
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);

View File

@ -15,11 +15,6 @@
#include "tsdb.h"
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = NULL;
@ -61,9 +56,18 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
*len = sizeof(uint64_t);
}
static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); }
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) {
SArray *pLastArray = (SArray *)value;
int16_t nCol = taosArrayGetSize(pLastArray);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLastArray, iCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) {
taosMemoryFree(pLastCol->colVal.value.pData);
}
}
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); }
taosArrayDestroy(value);
}
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
@ -75,13 +79,23 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (pRow->ts <= eKey) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
@ -130,14 +144,23 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (pRow->ts <= eKey) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
// getTableCacheKey(uid, "l", key, &keyLen);
@ -177,6 +200,46 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
TSKEY keyTs = row->ts;
bool invalidate = false;
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
int16_t nCol = taosArrayGetSize(pLast);
int16_t iCol = 0;
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs > tTsVal->ts) {
STColumn *pTColumn = &pTSchema->columns[0];
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
}
for (++iCol; iCol < nCol; ++iCol) {
SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs >= tTsVal1->ts) {
SColVal *tColVal = &tTsVal1->colVal;
SColVal colVal = {0};
tTSRowGetVal(row, pTSchema, iCol, &colVal);
if (!COL_VAL_IS_NONE(&colVal)) {
if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) {
invalidate = true;
break;
}
} else {
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = colVal});
}
}
}
_invalidate:
taosMemoryFreeClear(pTSchema);
taosLRUCacheRelease(pCache, h, invalidate);
/*
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (row->ts >= cacheRow->ts) {
if (row->ts == cacheRow->ts) {
@ -218,9 +281,9 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
/* tsdbCacheInsertLastrow(pCache, uid, row, dup); */
}
// tsdbCacheInsertLastrow(pCache, uid, row, dup);
}
}*/
} /*else {
if (dup) {
cacheRow = tdRowDup(row);
@ -456,6 +519,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
return code;
}
if (state->pDataFReader != NULL) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
if (code) goto _err;
@ -599,6 +667,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
*/
state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ);
if (!state->pBlockIdx) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
goto _next_fileset;
}
@ -1049,7 +1119,7 @@ _err:
return code;
}
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray) {
int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
@ -1057,7 +1127,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
int16_t iCol = 0;
int16_t noneCol = 0;
bool setNoneCol = false;
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
SColVal *pColVal = &(SColVal){0};
TSKEY lastRowTs = TSKEY_MAX;
@ -1073,12 +1143,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
break;
}
TSKEY rowTs = TSDBROW_TS(pRow);
if (lastRowTs == TSKEY_MAX) {
lastRowTs = TSDBROW_TS(pRow);
lastRowTs = rowTs;
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
if (taosArrayPush(pColArray, pColVal) == NULL) {
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -1086,7 +1159,18 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
for (iCol = 1; iCol < nCol; ++iCol) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (taosArrayPush(pColArray, pColVal) == NULL) {
SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
if (taosArrayPush(pColArray, &lastCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -1097,15 +1181,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
}
}
if (!setNoneCol) {
// goto build the result ts row
// done, goto return pColArray
break;
} else {
continue;
}
}
if ((TSDBROW_TS(pRow) < lastRowTs)) {
// goto build the result ts row
if ((rowTs < lastRowTs)) {
// done, goto return pColArray
break;
}
@ -1117,7 +1201,21 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
taosArraySet(pColArray, iCol, pColVal);
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
taosArraySet(pColArray, iCol, &lastCol);
} else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;
@ -1127,15 +1225,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
// build the result ts row here
*dup = false;
if (taosArrayGetSize(pColArray) == nCol) {
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
if (code) goto _err;
if (taosArrayGetSize(pColArray) != nCol) {
*ppColArray = NULL;
taosArrayDestroy(pColArray);
} else {
*ppRow = NULL;
*ppColArray = pColArray;
}
nextRowIterClose(&iter);
taosArrayDestroy(pColArray);
taosMemoryFreeClear(pTSchema);
return code;
@ -1185,7 +1282,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
for (iCol = 1; iCol < nCol; ++iCol) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
if (taosArrayPush(pColArray, &lastCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -1196,7 +1304,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
}
}
if (!setNoneCol) {
// goto build the result ts row
// done, goto return pColArray
break;
} else {
continue;
@ -1207,11 +1315,26 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
setNoneCol = false;
for (iCol = noneCol; iCol < nCol; ++iCol) {
// high version's column value
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol);
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
SColVal *tColVal = &lastColVal->colVal;
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
taosArraySet(pColArray, iCol, &lastCol);
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;
@ -1219,7 +1342,6 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
}
} while (setNoneCol);
// build the result ts row here
if (taosArrayGetSize(pColArray) <= 0) {
*ppLastArray = NULL;
taosArrayDestroy(pColArray);
@ -1252,13 +1374,13 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STSRow *pRow = NULL;
SArray *pArray = NULL;
bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
code = mergeLastRow(uid, pTsdb, &dup, &pArray);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
if (!dup && pRow) {
taosMemoryFree(pRow);
if (code < 0 || pArray == NULL) {
if (!dup && pArray) {
taosArrayDestroy(pArray);
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -1268,9 +1390,9 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
return 0;
}
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
@ -1328,15 +1450,17 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
SArray *pLastArray = NULL;
code = mergeLast(uid, pTsdb, &pLastArray);
// if table's empty or error, return code of -1
// if (code < 0 || pRow == NULL) {
if (code < 0 || pLastArray == NULL) {
taosThreadMutexUnlock(&pTsdb->lruMutex);
*handle = NULL;
return 0;
}
size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pLastArray, pLastArray->capacity, deleter, NULL,
TAOS_LRU_PRIORITY_LOW);
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}

View File

@ -29,31 +29,71 @@ typedef struct SCacheRowsReader {
SArray* pTableList; // table id list
} SCacheRowsReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
void** pRes) {
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
int32_t numOfRows = pBlock->info.rows;
SColVal colVal = {0};
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
if (slotIds[i] == -1) { // the primary timestamp
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
p->ts = pColVal->ts;
p->bytes = TSDB_KEYSIZE;
*(int64_t*)p->buf = pColVal->ts;
} else {
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
p->ts = pColVal->ts;
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
if (!p->isNull) {
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
varDataSetLen(p->buf, pColVal->colVal.value.nData);
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
} else {
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
p->bytes = pReader->pSchema->columns[slotId].bytes;
}
}
}
// pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
p->hasResult = true;
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
}
} else {
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (slotIds[i] == -1) {
colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
colDataAppend(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
} else {
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
SColVal* pVal = &pColVal->colVal;
tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal);
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (!COL_VAL_IS_VALUE(&colVal)) {
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
colDataAppendNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData);
memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData);
varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal));
colDataAppend(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
}
}
}
}
@ -118,35 +158,36 @@ void* tsdbCacherowsReaderClose(void* pReader) {
return NULL;
}
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow,
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
LRUHandle** h) {
int32_t code = TSDB_CODE_SUCCESS;
if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) {
code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
*pRow = NULL;
// no data in the table of Uid
if (*h != NULL) {
*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
}
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
} else {
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
}
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (*h != NULL) {
SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h);
tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema);
}
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
}
return code;
}
static void freeItem(void* pItem) {
SLastCol* pCol = (SLastCol*) pItem;
if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
taosMemoryFree(pCol->colVal.value.pData);
}
}
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
@ -157,13 +198,41 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
int32_t code = TSDB_CODE_SUCCESS;
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL;
STSRow* pRow = NULL;
SArray* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
bool hasRes = false;
SArray* pLastCols = NULL;
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
if (pRes == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
for (int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN;
}
pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
if (pLastCols == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
struct STColumn* pCol = &pr->pSchema->columns[i];
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type};
if (IS_VAR_DATA_TYPE(pCol->type)) {
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
}
taosArrayPush(pLastCols, &p);
}
// retrieve the only one last row of all tables in the uid list.
if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) {
int64_t lastKey = INT64_MIN;
bool internalResult = false;
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
@ -176,23 +245,59 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
continue;
}
if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
if (internalResult) {
pResBlock->info.rows -= 1;
taosArrayClear(pTableUidList);
{
for (int32_t k = 0; k < pr->numOfCols; ++k) {
int32_t slotId = slotIds[k];
if (slotId == -1) { // the primary timestamp
SLastCol* p = taosArrayGet(pLastCols, 0);
SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
if (pCol->ts > p->ts) {
hasRes = true;
p->ts = pCol->ts;
p->colVal = pCol->colVal;
// only set value for last row query
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
if (taosArrayGetSize(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
}
}
} else {
SLastCol* p = taosArrayGet(pLastCols, slotId);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
if (pColVal->ts > p->ts) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
internalResult = true;
lastKey = pRow->ts;
hasRes = true;
p->ts = pColVal->ts;
uint8_t* px = p->colVal.value.pData;
p->colVal = pColVal->colVal;
if (COL_VAL_IS_VALUE(&pColVal->colVal) && IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
p->colVal.value.pData = px;
memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
}
}
}
}
}
tsdbCacheRelease(lruCache, h);
}
} else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
if (hasRes) {
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes);
}
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pr->pTableList, i);
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
@ -204,19 +309,27 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
saveOneRow(pRow, pResBlock, pr, slotIds, pRes);
// TODO reset the pRes
taosArrayPush(pTableUidList, &pKeyInfo->uid);
tsdbCacheRelease(lruCache, h);
pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) {
return TSDB_CODE_SUCCESS;
goto _end;
}
}
} else {
return TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
_end:
for (int32_t j = 0; j < pr->numOfCols; ++j) {
taosMemoryFree(pRes[j]);
}
taosMemoryFree(pRes);
taosArrayDestroyEx(pLastCols, freeItem);
return code;
}

View File

@ -15,11 +15,9 @@
#include "vnd.h"
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN];
// TODO: check if directory exists
char dir[TSDB_FILENAME_LEN] = {0};
// check config
if (vnodeCheckCfg(pCfg) < 0) {
@ -56,18 +54,60 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
}
vInfo("vgId:%d, vnode is created", info.config.vgId);
return 0;
}
void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); }
int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0;
if (pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
}
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno));
return -1;
}
SSyncCfg *pCfg = &info.config.syncCfg;
pCfg->myIndex = pReq->selfIndex;
pCfg->replicaNum = pReq->replica;
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
vInfo("vgId:%d, save config, replicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->myIndex);
for (int i = 0; i < pReq->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
pNode->nodePort = pReq->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort);
}
ret = vnodeSaveInfo(dir, &info);
if (ret < 0) {
vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
return -1;
}
vInfo("vgId:%d, vnode config is saved", info.config.vgId);
return 0;
}
void vnodeDestroy(const char *path, STfs *pTfs) {
vInfo("path:%s is removed while destroy vnode", path);
tfsRmdir(pTfs, path);
}
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
SVnode *pVnode = NULL;
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN];
char tdir[TSDB_FILENAME_LEN * 2];
int ret;
char dir[TSDB_FILENAME_LEN] = {0};
char tdir[TSDB_FILENAME_LEN * 2] = {0};
int32_t ret = 0;
if (pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
@ -85,7 +125,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
}
// create handle
pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
if (pVnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));

View File

@ -1068,20 +1068,20 @@ static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, vo
}
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SAlterVnodeReq req = {0};
bool walChanged = false;
bool tsdbChanged = false;
if (tDeserializeSAlterVnodeReq(pReq, len, &req) != 0) {
SAlterVnodeConfigReq req = {0};
if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG;
}
vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
" cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d strict:%d",
" cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d",
TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
req.walFsyncPeriod, req.walLevel, req.strict);
req.walFsyncPeriod, req.walLevel);
if (pVnode->config.cacheLastSize != req.cacheLastSize) {
pVnode->config.cacheLastSize = req.cacheLastSize;

View File

@ -20,7 +20,7 @@
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_REPLICA);
(type == TDMT_VND_UPDATE_TAG_VAL);
}
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
@ -53,76 +53,6 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
}
}
static int32_t vnodeSetStandBy(SVnode *pVnode) {
vInfo("vgId:%d, start to set standby", TD_VID(pVnode));
if (syncSetStandby(pVnode->sync) == 0) {
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
return 0;
} else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
return -1;
}
vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
return -1;
} else {
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
}
if (syncSetStandby(pVnode->sync) == 0) {
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
return 0;
} else {
vError("vgId:%d, failed to set standby after leader transfer since %s", TD_VID(pVnode), terrstr());
return -1;
}
}
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
SAlterVnodeReq req = {0};
if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG;
}
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
for (int32_t r = 0; r < req.replica; ++r) {
SNodeInfo *pNode = &cfg.nodeInfo[r];
tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = req.replicas[r].port;
vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
}
SRpcMsg rpcMsg = {.info = pMsg->info};
if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
return -1;
}
int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
if (code != 0) {
if (terrno != 0) code = terrno;
vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
if (terrno == TSDB_CODE_SYN_IS_LEADER) {
if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
} else {
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
}
}
}
terrno = code;
return code;
}
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SEpSet newEpSet = {0};
syncGetRetryEpSet(pVnode->sync, &newEpSet);
@ -169,24 +99,6 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code)
}
}
static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
if (code > 0) {
ASSERT(0);
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsg);
} else {
if (terrno != 0) code = terrno;
vnodeHandleProposeError(pVnode, pMsg, code);
}
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
if (*arrSize <= 0) return;
@ -265,11 +177,6 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
continue;
}
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
vnodeHandleAlterReplicaReq(pVnode, pMsg);
continue;
}
if (isBlock || BATCH_DISABLE) {
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
}
@ -443,12 +350,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
code = vnodeSetStandBy(pVnode);
if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
code = -1;
@ -503,22 +404,7 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
return 0;
}
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {
SVnode *pVnode = pFsm->data;
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
syncGetAndDelRespRpc(pVnode->sync, cbMeta->newCfgSeqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta->index;
const STraceId *trace = (STraceId *)&pMsg->info.traceId;
vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta->seqNum, rpcMsg.info.handle);
if (rpcMsg.info.handle != NULL) {
tmsgSendRsp(&rpcMsg);
}
vnodePostBlockMsg(pVnode, pMsg);
}
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {}
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
if (cbMeta.isWeak == 0) {

View File

@ -58,7 +58,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
// partition by tbname
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | CACHESCAN_RETRIEVE_LAST_ROW;
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) {
@ -68,7 +68,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
} else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | CACHESCAN_RETRIEVE_LAST_ROW;
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW);
}
if (pScanNode->scan.pScanPseudoCols != NULL) {
@ -198,11 +198,14 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pInfo->pRes->info.rows > 0) {
if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
pInfo->pRes->info.groupId = pKeyInfo->groupId;
if (taosArrayGetSize(pInfo->pUidList) > 0) {
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
@ -210,6 +213,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL;
}
}
}
pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
return pInfo->pRes;

View File

@ -5475,9 +5475,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
}
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
ASSERT(numOfCols > 0);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {
.interval = pIntervalPhyNode->interval,
@ -5487,6 +5489,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
};
STimeWindowAggSupp twAggSupp = {
.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType,
@ -5494,6 +5497,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.minTs = INT64_MAX,
.deleteMark = INT64_MAX,
};
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pOperator->pTaskInfo = pTaskInfo;
pInfo->interval = interval;
@ -5501,16 +5505,25 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->isFinal = false;
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096);
SExprSupp* pSup = &pOperator->exprSupp;
initBasicInfo(&pInfo->binfo, pResBlock);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -2405,6 +2405,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = cachedLastRowFunction,
.finalizeFunc = firstLastFinalize
},
{
.name = "_cache_last",
.type = FUNCTION_TYPE_CACHE_LAST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunctionMerge,
.finalizeFunc = firstLastFinalize
},
{
.name = "_last_row_partial",
.type = FUNCTION_TYPE_LAST_PARTIAL,

View File

@ -57,16 +57,6 @@ typedef struct SAvgRes {
int16_t type; // store the original input type, used in merge function
} SAvgRes;
typedef struct STuplePos {
union {
struct {
int32_t pageId;
int32_t offset;
};
STupleKey streamTupleKey;
};
} STuplePos;
typedef struct SMinmaxResInfo {
bool assign; // assign the first value or not
int64_t v;
@ -93,17 +83,6 @@ typedef struct STopBotRes {
STopBotResItem* pItems;
} STopBotRes;
typedef struct SFirstLastRes {
bool hasResult;
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
// this attribute is required
bool isNull;
int32_t bytes;
int64_t ts;
STuplePos pos;
char buf[];
} SFirstLastRes;
typedef struct SStddevRes {
double result;
int64_t count;
@ -1163,13 +1142,13 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c
// the data is loaded, not only the block SMA value
for (int32_t i = start; i < num + start; ++i) {
char* p = colDataGetData(pCol, i);
if (memcpy((void*)tval, p, pCol->info.bytes) == 0) {
if (memcmp((void*)tval, p, pCol->info.bytes) == 0) {
return i;
}
}
ASSERT(0);
return 0;
// if reach here means real data of block SMA is not set in pCtx->input.
return -1;
}
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
@ -1211,8 +1190,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
pBuf->v = *(int64_t*)tval;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
}
}
} else {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
int64_t prev = 0;
@ -1223,9 +1204,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
*(int64_t*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
}
}
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t prev = 0;
GET_TYPED_DATA(prev, uint64_t, type, &pBuf->v);
@ -1235,9 +1218,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
*(uint64_t*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
}
}
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double prev = 0;
GET_TYPED_DATA(prev, double, type, &pBuf->v);
@ -1247,9 +1232,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
*(double*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
}
}
}
} else if (type == TSDB_DATA_TYPE_FLOAT) {
float prev = 0;
GET_TYPED_DATA(prev, float, type, &pBuf->v);
@ -1261,10 +1248,12 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
}
}
}
}
pBuf->assign = true;
return numOfElems;

View File

@ -16,6 +16,7 @@
#include "functionMgt.h"
#include "builtins.h"
#include "builtinsimpl.h"
#include "functionMgtInt.h"
#include "taos.h"
#include "taoserror.h"
@ -314,6 +315,11 @@ bool fmIsSameInOutType(int32_t funcId) {
return res;
}
void getLastCacheDataType(SDataType* pType) {
pType->bytes = getFirstLastInfoSize(pType->bytes) + VARSTR_HEADER_SIZE;
pType->type = TSDB_DATA_TYPE_BINARY;
}
static int32_t getFuncInfo(SFunctionNode* pFunc) {
char msg[128] = {0};
return fmGetFuncInfo(pFunc, msg, sizeof(msg));

View File

@ -1969,7 +1969,7 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT };
enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, PHY_LAST_ROW_SCAN_CODE_IGNULL };
static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj;
@ -1981,6 +1981,9 @@ static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, pNode->groupSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_IGNULL, pNode->ignoreNull);
}
return code;
}
@ -2001,6 +2004,9 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_LAST_ROW_SCAN_CODE_GROUP_SORT:
code = tlvDecodeBool(pTlv, &pNode->groupSort);
break;
case PHY_LAST_ROW_SCAN_CODE_IGNULL:
code = tlvDecodeBool(pTlv, &pNode->ignoreNull);
break;
default:
break;
}

View File

@ -224,8 +224,8 @@ alter_db_option(A) ::= WAL_FSYNC_PERIOD NK_INTEGER(B).
alter_db_option(A) ::= KEEP integer_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }
alter_db_option(A) ::= KEEP variable_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }
alter_db_option(A) ::= PAGES NK_INTEGER(B). { A.type = DB_OPTION_PAGES; A.val = B; }
//alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.val = B; }
//alter_db_option(A) ::= STRICT NK_STRING(B). { A.type = DB_OPTION_STRICT; A.val = B; }
alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.val = B; }
alter_db_option(A) ::= STRICT NK_STRING(B). { A.type = DB_OPTION_STRICT; A.val = B; }
alter_db_option(A) ::= WAL_LEVEL NK_INTEGER(B). { A.type = DB_OPTION_WAL; A.val = B; }
alter_db_option(A) ::= STT_TRIGGER NK_INTEGER(B). { A.type = DB_OPTION_STT_TRIGGER; A.val = B; }

File diff suppressed because it is too large Load Diff

View File

@ -2139,6 +2139,46 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef);
}
typedef struct SLastRowScanOptLastParaCkCxt {
bool hasTag;
bool hasCol;
} SLastRowScanOptLastParaCkCxt;
static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SLastRowScanOptLastParaCkCxt* pCxt = pContext;
if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
pCxt->hasTag = true;
} else {
pCxt->hasCol = true;
}
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
static bool lastRowScanOptLastParaCheck(SNode* pExpr) {
SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false};
nodesWalkExpr(pExpr, lastRowScanOptLastParaCheckImpl, &cxt);
return !cxt.hasTag && cxt.hasCol;
}
static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) {
switch (cacheLastMode) {
case TSDB_CACHE_MODEL_NONE:
return false;
case TSDB_CACHE_MODEL_LAST_ROW:
return hasLastRow;
case TSDB_CACHE_MODEL_LAST_VALUE:
return hasLast;
case TSDB_CACHE_MODEL_BOTH:
return true;
default:
break;
}
return false;
}
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
@ -2149,16 +2189,27 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
// Only one of LAST and LASTROW can appear
if (pAgg->hasLastRow == pAgg->hasLast || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions ||
0 == pScan->cacheLastMode || IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
!hasSuitableCache(pScan->cacheLastMode, pAgg->hasLastRow, pAgg->hasLast) ||
IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
return false;
}
bool hasLastFunc = false;
bool hasSelectFunc = false;
SNode* pFunc = NULL;
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType &&
// FUNCTION_TYPE_LAST != ((SFunctionNode*)pFunc)->funcType &&
FUNCTION_TYPE_SELECT_VALUE != ((SFunctionNode*)pFunc)->funcType &&
FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pFunc)->funcType) {
SFunctionNode* pAggFunc = (SFunctionNode*)pFunc;
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
if (hasSelectFunc || !lastRowScanOptLastParaCheck(nodesListGetNode(pAggFunc->pParameterList, 0))) {
return false;
}
hasLastFunc = true;
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
if (hasLastFunc) {
return false;
}
hasSelectFunc = true;
} else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) {
return false;
}
}
@ -2166,6 +2217,31 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
return true;
}
typedef struct SLastRowScanOptSetColDataTypeCxt {
bool doAgg;
SNodeList* pLastCols;
} SLastRowScanOptSetColDataTypeCxt;
static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SLastRowScanOptSetColDataTypeCxt* pCxt = pContext;
if (pCxt->doAgg) {
nodesListMakeAppend(&pCxt->pLastCols, pNode);
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType));
} else {
SNode* pCol = NULL;
FOREACH(pCol, pCxt->pLastCols) {
if (nodesEqualNode(pCol, pNode)) {
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType));
break;
}
}
}
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized);
@ -2173,22 +2249,36 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
return TSDB_CODE_SUCCESS;
}
SLastRowScanOptSetColDataTypeCxt cxt = {.doAgg = true, .pLastCols = NULL};
SNode* pNode = NULL;
FOREACH(pNode, pAgg->pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (FUNCTION_TYPE_LAST_ROW == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row");
int32_t funcType = pFunc->funcType;
if (FUNCTION_TYPE_LAST_ROW == funcType || FUNCTION_TYPE_LAST == funcType) {
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName),
FUNCTION_TYPE_LAST_ROW == funcType ? "_cache_last_row" : "_cache_last");
pFunc->functionName[len] = '\0';
int32_t code = fmGetFuncInfo(pFunc, NULL, 0);
if (TSDB_CODE_SUCCESS != code) {
nodesClearList(cxt.pLastCols);
return code;
}
if (FUNCTION_TYPE_LAST == funcType) {
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
}
}
}
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
pScan->scanType = SCAN_TYPE_LAST_ROW;
pScan->igLastNull = pAgg->hasLast ? true : false;
if (NULL != cxt.pLastCols) {
cxt.doAgg = false;
nodesWalkExprs(pScan->pScanCols, lastRowScanOptSetColDataType, &cxt);
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
nodesWalkExprs(pScan->node.pTargets, lastRowScanOptSetColDataType, &cxt);
nodesClearList(cxt.pLastCols);
}
pAgg->hasLastRow = false;
pAgg->hasLast = false;

View File

@ -105,24 +105,6 @@ TEST_F(PlanBasicTest, interpFunc) {
run("SELECT _IROWTS, INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");
}
TEST_F(PlanBasicTest, lastRowFunc) {
useDb("root", "cache_db");
run("SELECT LAST_ROW(c1) FROM t1");
run("SELECT LAST_ROW(*) FROM t1");
run("SELECT LAST_ROW(c1, c2) FROM t1");
run("SELECT LAST_ROW(c1), c2 FROM t1");
run("SELECT LAST_ROW(c1) FROM st1");
run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME");
run("SELECT LAST_ROW(c1), SUM(c3) FROM t1");
}
TEST_F(PlanBasicTest, lastRowFuncWithoutCache) {
useDb("root", "test");

View File

@ -109,9 +109,28 @@ TEST_F(PlanOptimizeTest, mergeProjects) {
TEST_F(PlanOptimizeTest, pushDownProjectCond) {
useDb("root", "test");
run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first");
}
TEST_F(PlanOptimizeTest, LastRowScan) {
useDb("root", "cache_db");
run("SELECT LAST_ROW(c1), c2 FROM t1");
run("SELECT LAST_ROW(c1), c2, tag1, tbname FROM st1");
run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME");
run("SELECT LAST_ROW(c1), SUM(c3) FROM t1");
run("SELECT LAST_ROW(tag1) FROM st1");
run("SELECT LAST(c1) FROM st1");
run("SELECT LAST(c1), c2 FROM st1");
}
TEST_F(PlanOptimizeTest, tagScan) {
useDb("root", "test");
run("select tag1 from st1 group by tag1");

View File

@ -371,7 +371,7 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2
}
return cret;
}
/*
static int tdbBtreeOpenImpl(SBTree *pBt) {
// Try to get the root page of the an existing btree
SPgno pgno;
@ -400,7 +400,7 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
pBt->root = pgno;
return 0;
}
*/
int tdbBtreeInitPage(SPage *pPage, void *arg, int init) {
SBTree *pBt;
u8 flags;

View File

@ -586,7 +586,7 @@ static inline void setLPageNFree(SPage *pPage, int nFree) {
// cell offset
static inline int getLPageCellOffset(SPage *pPage, int idx) {
ASSERT(idx >= 0 && idx < getPageCellNum(pPage));
ASSERT(idx >= 0 && idx < getLPageCellNum(pPage));
return TDB_GET_U24(pPage->pCellIdx + 3 * idx);
}

View File

@ -116,7 +116,7 @@ int tdbPagerClose(SPager *pPager) {
}
return 0;
}
/*
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) {
SPgno pgno;
SPage *pPage;
@ -167,7 +167,7 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) {
*ppgno = pgno;
return 0;
}
*/
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
int ret;
SPage **ppPage;

View File

@ -811,7 +811,7 @@ static bool addHandleToAcceptloop(void* arg) {
tError("failed to bind:%s", uv_err_name(err));
return false;
}
if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
if ((err = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) {
tError("failed to listen:%s", uv_err_name(err));
terrno = TSDB_CODE_RPC_PORT_EADDRINUSE;
return false;

View File

@ -278,12 +278,12 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
wDebug("vgId:%d, wal starts to fetch body, index:%" PRId64, pRead->pWal->cfg.vgId, ver);
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
pRead->pHead = (SWalCkHead *)ptr;
pRead->pHead = ptr;
pReadHead = &pRead->pHead->head;
pRead->capacity = pReadHead->bodyLen;
}
@ -398,12 +398,12 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
int64_t ver = pReadHead->version;
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
*ppHead = (SWalCkHead *)ptr;
*ppHead = ptr;
pReadHead = &((*ppHead)->head);
pRead->capacity = pReadHead->bodyLen;
}
@ -493,13 +493,14 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
}
if (pReader->capacity < pReader->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
SWalCkHead *ptr =
(SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
pReader->pHead = (SWalCkHead *)ptr;
pReader->pHead = ptr;
pReader->capacity = pReader->pHead->head.bodyLen;
}

View File

@ -39,6 +39,7 @@ if $data02 != 5.000000000 then
return -1
endi
if $data03 != 3 then
print expect 3, actual: $data03
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
@ -210,7 +211,7 @@ if $data01 != 6 then
return -1
endi
if $data02 != 37.000000000 then
print $data02
print expect 37.000000000 actual: $data02
return -1
endi
if $data03 != 27 then
@ -233,7 +234,7 @@ if $data01 != 6 then
return -1
endi
if $data02 != 37.000000000 then
print $data02
print expect 37.000000000, acutal: $data02
return -1
endi
if $data03 != 27 then

View File

@ -13,7 +13,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
tdSql.init(conn.cursor(), True)
self.tb_nums = 10
self.row_nums = 20
self.ts = 1434938400000

@ -1 +0,0 @@
Subproject commit 1bdfca396cd6730cdc334e06fc7b2156dd1239a0