Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0
This commit is contained in:
commit
a6423b7816
|
@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
|
|||
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
||||
|
||||
hint:
|
||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP
|
||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
|
||||
|
||||
select_list:
|
||||
select_expr [, select_expr] ...
|
||||
|
@ -87,12 +87,13 @@ Hints are a means of user control over query optimization for individual stateme
|
|||
|
||||
The list of currently supported Hints is as follows:
|
||||
|
||||
| **Hint** | **Params** | **Comment** | **Scopt** |
|
||||
| **Hint** | **Params** | **Comment** | **Scope** |
|
||||
| :-----------: | -------------- | -------------------------- | -----------------------------------|
|
||||
| BATCH_SCAN | None | Batch table scan | JOIN statment for stable |
|
||||
| NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable |
|
||||
| SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list |
|
||||
| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list |
|
||||
| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp |
|
||||
|
||||
For example:
|
||||
|
||||
|
@ -100,6 +101,7 @@ For example:
|
|||
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
||||
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
|
||||
```
|
||||
|
||||
## Lists
|
||||
|
|
|
@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
|
|||
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
|
||||
|
||||
hint:
|
||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP
|
||||
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
|
||||
|
||||
select_list:
|
||||
select_expr [, select_expr] ...
|
||||
|
@ -93,13 +93,14 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适
|
|||
| NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 |
|
||||
| SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 |
|
||||
| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 |
|
||||
|
||||
| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 |
|
||||
举例:
|
||||
|
||||
```sql
|
||||
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
|
||||
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
|
||||
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
|
||||
```
|
||||
|
||||
## 列表
|
||||
|
|
|
@ -32,6 +32,9 @@ typedef struct SBlockOrderInfo {
|
|||
SColumnInfoData* pColData;
|
||||
} SBlockOrderInfo;
|
||||
|
||||
#define BLOCK_VERSION_1 1
|
||||
#define BLOCK_VERSION_2 2
|
||||
|
||||
#define NBIT (3u)
|
||||
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
|
||||
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
|
||||
|
|
|
@ -207,9 +207,6 @@ typedef enum _mgmt_table {
|
|||
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
|
||||
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
|
||||
|
||||
#define TD_REQ_FROM_APP 0
|
||||
#define TD_REQ_FROM_TAOX 1
|
||||
|
||||
typedef enum ENodeType {
|
||||
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
|
||||
// VALUE, OPERATOR, FUNCTION and so on.
|
||||
|
@ -760,7 +757,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
|
|||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
int8_t source; // 1-taosX or 0-taosClient
|
||||
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||
int8_t reserved[6];
|
||||
tb_uid_t suid;
|
||||
int64_t delay1;
|
||||
|
@ -803,7 +800,7 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp);
|
|||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igNotExists;
|
||||
int8_t source; // 1-taosX or 0-taosClient
|
||||
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||
int8_t reserved[6];
|
||||
tb_uid_t suid;
|
||||
int32_t sqlLen;
|
||||
|
@ -1614,7 +1611,6 @@ typedef struct {
|
|||
SEp ep;
|
||||
char active[TSDB_ACTIVE_KEY_LEN];
|
||||
char connActive[TSDB_CONN_ACTIVE_KEY_LEN];
|
||||
char machineId[TSDB_MACHINE_ID_LEN + 1];
|
||||
} SDnodeInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -2662,6 +2658,7 @@ typedef struct SVCreateStbReq {
|
|||
SRSmaParam rsmaParam;
|
||||
int32_t alterOriDataLen;
|
||||
void* alterOriData;
|
||||
int8_t source;
|
||||
} SVCreateStbReq;
|
||||
|
||||
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
|
||||
|
@ -2731,6 +2728,7 @@ typedef struct {
|
|||
SVCreateTbReq* pReqs;
|
||||
SArray* pArray;
|
||||
};
|
||||
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||
} SVCreateTbBatchReq;
|
||||
|
||||
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
|
||||
|
@ -2823,6 +2821,7 @@ typedef struct {
|
|||
int32_t newCommentLen;
|
||||
char* newComment;
|
||||
int64_t ctimeMs; // fill by vnode
|
||||
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
|
||||
} SVAlterTbReq;
|
||||
|
||||
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
|
||||
|
@ -3920,12 +3919,13 @@ int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp);
|
|||
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||
|
||||
#define TD_REQ_FROM_APP 0x0
|
||||
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
|
||||
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
||||
#define SUBMIT_REQ_FROM_FILE 0x4
|
||||
#define TD_REQ_FROM_TAOX 0x8
|
||||
|
||||
#define SOURCE_NULL 0
|
||||
#define SOURCE_TAOSX 1
|
||||
#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility
|
||||
|
||||
typedef struct {
|
||||
int32_t flags;
|
||||
|
@ -3938,7 +3938,6 @@ typedef struct {
|
|||
SArray* aCol;
|
||||
};
|
||||
int64_t ctimeMs;
|
||||
int8_t source;
|
||||
} SSubmitTbData;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -22,6 +22,9 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define TBASE_MAX_ILEN 4096
|
||||
#define TBASE_MAX_OLEN 5653
|
||||
|
||||
uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen);
|
||||
char *base58_encode(const uint8_t *value, int32_t vlen);
|
||||
|
||||
|
|
|
@ -1850,7 +1850,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
|
|||
|
||||
char* pStart = p + len;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i];
|
||||
int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
|
||||
|
||||
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t* offset = (int32_t*)pStart;
|
||||
|
@ -1949,8 +1949,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
char* pStart = p;
|
||||
char* pStart1 = p1;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i];
|
||||
int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i];
|
||||
int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
|
||||
int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
|
||||
if (ASSERT(colLen < dataLen)) {
|
||||
tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
|
||||
return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
|
@ -2009,7 +2009,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
}
|
||||
colLen1 = len;
|
||||
totalLen += colLen1;
|
||||
colLength1[i] = (blockVersion == 1) ? htonl(len) : len;
|
||||
colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
|
||||
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||
len = numOfRows * sizeof(int32_t);
|
||||
memcpy(pStart1, pStart, len);
|
||||
|
@ -2098,7 +2098,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
|||
|
||||
char* pStart = p;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
if(blockVersion == 1){
|
||||
if(blockVersion == BLOCK_VERSION_1){
|
||||
colLength[i] = htonl(colLength[i]);
|
||||
}
|
||||
if (colLength[i] >= dataLen) {
|
||||
|
|
|
@ -1001,6 +1001,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
|
||||
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
|
||||
taosArrayPush(tBatch.req.pArray, pCreateReq);
|
||||
tBatch.req.source = TD_REQ_FROM_TAOX;
|
||||
|
||||
taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
|
||||
} else { // add to the correct vgroup
|
||||
|
@ -1276,7 +1277,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
return terrno;
|
||||
}
|
||||
SVAlterTbReq req = {0};
|
||||
SDecoder coder = {0};
|
||||
SDecoder dcoder = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SRequestObj* pRequest = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
|
@ -1297,8 +1298,8 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
// decode and process req
|
||||
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
|
||||
int32_t len = metaLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&coder, data, len);
|
||||
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
|
||||
tDecoderInit(&dcoder, data, len);
|
||||
if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
goto end;
|
||||
}
|
||||
|
@ -1340,14 +1341,36 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
goto end;
|
||||
}
|
||||
pVgData->vg = pInfo;
|
||||
pVgData->pData = taosMemoryMalloc(metaLen);
|
||||
if (NULL == pVgData->pData) {
|
||||
|
||||
int tlen = 0;
|
||||
req.source = TD_REQ_FROM_TAOX;
|
||||
tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
|
||||
if(code != 0){
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
memcpy(pVgData->pData, meta, metaLen);
|
||||
((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
|
||||
pVgData->size = metaLen;
|
||||
tlen += sizeof(SMsgHead);
|
||||
void* pMsg = taosMemoryMalloc(tlen);
|
||||
if (NULL == pMsg) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
|
||||
((SMsgHead*)pMsg)->contLen = htonl(tlen);
|
||||
void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
|
||||
SEncoder coder = {0};
|
||||
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
|
||||
code = tEncodeSVAlterTbReq(&coder, &req);
|
||||
if(code != 0){
|
||||
tEncoderClear(&coder);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
tEncoderClear(&coder);
|
||||
|
||||
pVgData->pData = pMsg;
|
||||
pVgData->size = tlen;
|
||||
|
||||
pVgData->numOfTables = 1;
|
||||
taosArrayPush(pArray, &pVgData);
|
||||
|
||||
|
@ -1387,7 +1410,7 @@ end:
|
|||
if (pVgData) taosMemoryFreeClear(pVgData->pData);
|
||||
taosMemoryFreeClear(pVgData);
|
||||
destroyRequest(pRequest);
|
||||
tDecoderClear(&coder);
|
||||
tDecoderClear(&dcoder);
|
||||
qDestroyQuery(pQuery);
|
||||
terrno = code;
|
||||
return code;
|
||||
|
|
|
@ -389,7 +389,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
}
|
||||
}
|
||||
if (strcasecmp(key, "msg.consume.excluded") == 0) {
|
||||
conf->sourceExcluded = taosStr2int64(value);
|
||||
conf->sourceExcluded = (taosStr2int64(value) != 0) ? TD_REQ_FROM_TAOX : 0;
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
|
@ -1611,17 +1611,39 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
|||
return pRspObj;
|
||||
}
|
||||
|
||||
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||
pRspObj->resType = RES_TYPE__TMQ;
|
||||
|
||||
void changeByteEndian(char* pData){
|
||||
char* p = pData;
|
||||
|
||||
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
|
||||
// version:
|
||||
int32_t blockVersion = *(int32_t*)p;
|
||||
ASSERT(blockVersion == BLOCK_VERSION_1);
|
||||
*(int32_t*)p = BLOCK_VERSION_2;
|
||||
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
int32_t cols = *(int32_t*)p;
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(uint64_t);
|
||||
// check fields
|
||||
p += cols * (sizeof(int8_t) + sizeof(int32_t));
|
||||
|
||||
int32_t* colLength = (int32_t*)p;
|
||||
for (int32_t i = 0; i < cols; ++i) {
|
||||
colLength[i] = htonl(colLength[i]);
|
||||
}
|
||||
}
|
||||
|
||||
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) {
|
||||
(*numOfRows) = 0;
|
||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||
|
||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||
pRspObj->resIter = -1;
|
||||
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
|
||||
|
||||
pRspObj->resInfo.totalRows = 0;
|
||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||
|
@ -1633,41 +1655,44 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
|
|||
}
|
||||
// extract the rows in this data packet
|
||||
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
||||
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||
int64_t rows = htobe64(pRetrieve->numOfRows);
|
||||
void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||
void* rawData = NULL;
|
||||
int64_t rows = 0;
|
||||
// deal with compatibility
|
||||
if(*(int64_t*)pRetrieve == 0){
|
||||
rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
|
||||
rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
|
||||
}else if(*(int64_t*)pRetrieve == 1){
|
||||
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
|
||||
rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
|
||||
}
|
||||
|
||||
pVg->numOfRows += rows;
|
||||
(*numOfRows) += rows;
|
||||
|
||||
if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
|
||||
SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
|
||||
if (schema) {
|
||||
changeByteEndian(rawData);
|
||||
if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable
|
||||
SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
|
||||
if(schema){
|
||||
taosArrayPush(pRspObj->rsp.blockSchema, &schema);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||
pRspObj->resType = RES_TYPE__TMQ;
|
||||
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
|
||||
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj);
|
||||
return pRspObj;
|
||||
}
|
||||
|
||||
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
||||
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||
pRspObj->resIter = -1;
|
||||
memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
|
||||
|
||||
pRspObj->resInfo.totalRows = 0;
|
||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||
|
||||
// extract the rows in this data packet
|
||||
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
||||
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||
int64_t rows = htobe64(pRetrieve->numOfRows);
|
||||
pVg->numOfRows += rows;
|
||||
(*numOfRows) += rows;
|
||||
}
|
||||
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj);
|
||||
return pRspObj;
|
||||
}
|
||||
|
||||
|
|
|
@ -2197,7 +2197,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
|
||||
// todo extract method
|
||||
int32_t* version = (int32_t*)data;
|
||||
*version = 2;
|
||||
*version = BLOCK_VERSION_1;
|
||||
data += sizeof(int32_t);
|
||||
|
||||
int32_t* actualLen = (int32_t*)data;
|
||||
|
@ -2278,7 +2278,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
data += colSizes[col];
|
||||
}
|
||||
|
||||
// colSizes[col] = htonl(colSizes[col]);
|
||||
colSizes[col] = htonl(colSizes[col]);
|
||||
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
|
||||
// htonl(colSizes[col]), colSizes[col]);
|
||||
}
|
||||
|
@ -2343,9 +2343,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
|
|||
pStart += sizeof(int32_t) * numOfCols;
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
if(version == 1){
|
||||
colLen[i] = htonl(colLen[i]);
|
||||
}
|
||||
colLen[i] = htonl(colLen[i]);
|
||||
ASSERT(colLen[i] >= 0);
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
|
|
@ -7513,6 +7513,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
|
|||
if (pReq->alterOriDataLen > 0) {
|
||||
if (tEncodeBinary(pCoder, pReq->alterOriData, pReq->alterOriDataLen) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI8(pCoder, pReq->source) < 0) return -1;
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
|
@ -7535,6 +7536,10 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
|
|||
if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1;
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(pCoder)) {
|
||||
if (tDecodeI8(pCoder, &pReq->source) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -7663,6 +7668,8 @@ int tEncodeSVCreateTbBatchReq(SEncoder *pCoder, const SVCreateTbBatchReq *pReq)
|
|||
if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tEncodeI8(pCoder, pReq->source) < 0) return -1;
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -7677,6 +7684,10 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) {
|
|||
if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1;
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(pCoder)) {
|
||||
if (tDecodeI8(pCoder, &pReq->source) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -8034,6 +8045,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
|
|||
break;
|
||||
}
|
||||
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pReq->source) < 0) return -1;
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
return 0;
|
||||
|
@ -8094,6 +8106,9 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
|
|||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
|
||||
}
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
if (tDecodeI8(pDecoder, &pReq->source) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
|
@ -8670,7 +8685,6 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
|
|||
}
|
||||
}
|
||||
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1;
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
|
@ -8758,12 +8772,6 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
|
|||
goto _exit;
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(pCoder)) {
|
||||
if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
tEndDecode(pCoder);
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "dmInt.h"
|
||||
#include "systable.h"
|
||||
#include "tgrant.h"
|
||||
|
||||
extern SConfig *tsCfg;
|
||||
|
||||
|
@ -118,11 +117,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
|||
req.memTotal = tsTotalMemoryKB * 1024;
|
||||
req.memAvail = req.memTotal - tsRpcQueueMemoryAllowed - 16 * 1024 * 1024;
|
||||
tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
|
||||
char *machine = tGetMachineId();
|
||||
if (machine) {
|
||||
tstrncpy(req.machineId, machine, TSDB_MACHINE_ID_LEN + 1);
|
||||
taosMemoryFreeClear(machine);
|
||||
}
|
||||
tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
|
||||
|
||||
req.clusterCfg.statusInterval = tsStatusInterval;
|
||||
req.clusterCfg.checkTime = 0;
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#ifdef TD_TSZ
|
||||
#include "tcompression.h"
|
||||
#include "tglobal.h"
|
||||
#include "tgrant.h"
|
||||
#endif
|
||||
|
||||
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
|
||||
|
@ -137,6 +138,16 @@ int32_t dmInitVars(SDnode *pDnode) {
|
|||
pData->rebootTime = taosGetTimestampMs();
|
||||
pData->dropped = 0;
|
||||
pData->stopped = 0;
|
||||
char *machineId = tGetMachineId();
|
||||
if (machineId) {
|
||||
tstrncpy(pData->machineId, machineId, TSDB_MACHINE_ID_LEN + 1);
|
||||
taosMemoryFreeClear(machineId);
|
||||
} else {
|
||||
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
|
||||
terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
|
||||
return -1;
|
||||
#endif
|
||||
}
|
||||
|
||||
pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
if (pData->dnodeHash == NULL) {
|
||||
|
|
|
@ -109,6 +109,7 @@ typedef struct {
|
|||
SMsgCb msgCb;
|
||||
bool validMnodeEps;
|
||||
int64_t ipWhiteVer;
|
||||
char machineId[TSDB_MACHINE_ID_LEN + 1];
|
||||
} SDnodeData;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -464,6 +464,7 @@ typedef struct {
|
|||
char* pAst1;
|
||||
char* pAst2;
|
||||
SRWLatch lock;
|
||||
int8_t source;
|
||||
} SStbObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -119,6 +119,7 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
|
|||
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
|
||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
|
||||
|
||||
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
||||
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
||||
|
|
|
@ -141,7 +141,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
|
|||
memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
|
||||
taosMemoryFreeClear(machineId);
|
||||
} else {
|
||||
#ifdef TD_ENTERPRISE
|
||||
#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG)
|
||||
terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
|
||||
goto _OVER;
|
||||
#endif
|
||||
|
@ -410,9 +410,6 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
|
|||
dInfo.ep.port = pDnode->port;
|
||||
dInfo.offlineReason = pDnode->offlineReason;
|
||||
tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
tstrncpy(dInfo.active, pDnode->active, TSDB_ACTIVE_KEY_LEN);
|
||||
tstrncpy(dInfo.connActive, pDnode->connActive, TSDB_CONN_ACTIVE_KEY_LEN);
|
||||
tstrncpy(dInfo.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
|
||||
sdbRelease(pSdb, pDnode);
|
||||
if (mndIsMnode(pMnode, pDnode->id)) {
|
||||
dInfo.isMnode = 1;
|
||||
|
|
|
@ -44,9 +44,8 @@ static bool hasCountWindowNode(SPhysiNode* pNode) {
|
|||
}
|
||||
}
|
||||
|
||||
static bool countWindowStreamTask(SSubplan* pPlan) {
|
||||
SPhysiNode* pNode = pPlan->pNode;
|
||||
return hasCountWindowNode(pNode);
|
||||
static bool isCountWindowStreamTask(SSubplan* pPlan) {
|
||||
return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
|
||||
}
|
||||
|
||||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||
|
@ -342,13 +341,13 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe
|
|||
}
|
||||
}
|
||||
|
||||
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan) {
|
||||
bool hasCountWindowNode = countWindowStreamTask(pPlan);
|
||||
bool isRelStreamTask = (pTask->hTaskInfo.id.taskId != 0);
|
||||
if (hasCountWindowNode && isRelStreamTask) {
|
||||
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
|
||||
bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
|
||||
|
||||
if (hasCountWindowNode && (!isFillhistoryTask)) {
|
||||
SStreamStatus* pStatus = &pTask->status;
|
||||
mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set",
|
||||
pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT));
|
||||
mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
|
||||
pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
|
||||
pStatus->taskStatus = TASK_STATUS__HALT;
|
||||
}
|
||||
}
|
||||
|
@ -398,15 +397,17 @@ static void setHTasksId(SStreamObj* pStream) {
|
|||
|
||||
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
|
||||
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
|
||||
// new stream task
|
||||
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
||||
|
||||
haltInitialTaskStatus(pTask, plan);
|
||||
if (pStream->conf.fillHistory) {
|
||||
haltInitialTaskStatus(pTask, plan, isFillhistory);
|
||||
}
|
||||
|
||||
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
||||
|
||||
|
@ -415,6 +416,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
|
|||
terrno = code;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -458,6 +458,7 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3
|
|||
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
|
||||
req.alterOriData = alterOriData;
|
||||
req.alterOriDataLen = alterOriDataLen;
|
||||
req.source = pStb->source;
|
||||
// todo
|
||||
req.schemaRow.nCols = pStb->numOfColumns;
|
||||
req.schemaRow.version = pStb->colVer;
|
||||
|
@ -774,7 +775,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
|||
pDst->createdTime = taosGetTimestampMs();
|
||||
pDst->updateTime = pDst->createdTime;
|
||||
pDst->uid =
|
||||
(pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
(pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
|
||||
? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
pDst->dbUid = pDb->uid;
|
||||
pDst->tagVer = 1;
|
||||
pDst->colVer = 1;
|
||||
|
@ -790,6 +792,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
|||
pDst->numOfFuncs = pCreate->numOfFuncs;
|
||||
pDst->commentLen = pCreate->commentLen;
|
||||
pDst->pFuncs = pCreate->pFuncs;
|
||||
pDst->source = pCreate->source;
|
||||
pCreate->pFuncs = NULL;
|
||||
|
||||
if (pDst->commentLen > 0) {
|
||||
|
@ -1033,6 +1036,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
|||
memcpy(pDst, pStb, sizeof(SStbObj));
|
||||
taosRUnLockLatch(&pStb->lock);
|
||||
|
||||
pDst->source = createReq->source;
|
||||
pDst->updateTime = taosGetTimestampMs();
|
||||
pDst->numOfColumns = createReq->numOfColumns;
|
||||
pDst->numOfTags = createReq->numOfTags;
|
||||
|
@ -1141,7 +1145,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||
goto _OVER;
|
||||
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) {
|
||||
} else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) && (createReq.tagVer != 1 || createReq.colVer != 1)) {
|
||||
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
|
@ -2572,7 +2576,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (dropReq.source == TD_REQ_FROM_TAOX && pStb->uid != dropReq.suid) {
|
||||
if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) {
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
|
|||
taosArrayPush(pList, pInfo);
|
||||
}
|
||||
|
||||
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
|
||||
if (pTrans == NULL) {
|
||||
return terrno;
|
||||
|
@ -119,7 +119,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
|
|||
} else {
|
||||
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
|
||||
pStream->uid, transId);
|
||||
code = createStreamResetStatusTrans(pMnode, pStream);
|
||||
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -261,22 +261,30 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg
|
|||
return mndTransAppendRedoAction(pTrans, &action);
|
||||
}
|
||||
|
||||
static bool identicalName(const char* pDb, const char* pParam, int32_t len) {
|
||||
return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
|
||||
}
|
||||
|
||||
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
|
||||
// data in the hash table will be removed automatically, no need to remove it here.
|
||||
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
|
||||
if (pTransInfo == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
void *pIter = NULL;
|
||||
|
||||
// not checkpoint trans, ignore
|
||||
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
|
||||
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
|
||||
SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
|
||||
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char *pDupDBName = strndup(pDBName, len);
|
||||
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName);
|
||||
taosMemoryFree(pDupDBName);
|
||||
SStreamObj *pStream = mndGetStreamObj(pMnode, pTransInfo->streamId);
|
||||
if (pStream != NULL) {
|
||||
if (identicalName(pStream->sourceDb, pDBName, len)) {
|
||||
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
|
||||
} else if (identicalName(pStream->targetDb, pDBName, len)) {
|
||||
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
|
||||
}
|
||||
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -231,6 +231,8 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
|
|||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mDebug("set the resume action for trans:%d", pTrans->id);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -2484,7 +2484,7 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false);
|
||||
|
||||
char objName[20] = {0};
|
||||
char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)objName, false);
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
continue;
|
||||
}
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL};
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE};
|
||||
|
||||
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
|
||||
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true);
|
||||
|
|
|
@ -392,7 +392,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
|||
pReader->msg.ver);
|
||||
|
||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||
if ((pSubmitTbData->source & sourceExcluded) != 0) {
|
||||
if ((pSubmitTbData->flags & sourceExcluded) != 0) {
|
||||
pReader->nextBlk += 1;
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -267,7 +267,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
|||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
|
||||
}
|
||||
|
||||
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) {
|
||||
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
|
||||
goto loop_table;
|
||||
}
|
||||
if (pRsp->withTbName) {
|
||||
|
@ -335,7 +335,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
|||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
|
||||
}
|
||||
|
||||
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) {
|
||||
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
|
||||
goto loop_db;
|
||||
}
|
||||
if (pRsp->withTbName) {
|
||||
|
|
|
@ -816,7 +816,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
return;
|
||||
}
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
continue;
|
||||
|
@ -861,7 +861,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
pTask->execInfo.sink.numOfBlocks += 1;
|
||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||
|
||||
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
||||
if (index == NULL) { // no data yet, append it
|
||||
|
|
|
@ -171,6 +171,22 @@ end : {
|
|||
}
|
||||
}
|
||||
|
||||
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \
|
||||
SDecoder decoder = {0};\
|
||||
TYPE req = {0}; \
|
||||
void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \
|
||||
int32_t len = pHead->bodyLen - sizeof(SMsgHead); \
|
||||
tDecoderInit(&decoder, data, len); \
|
||||
if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) { \
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \
|
||||
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \
|
||||
fetchVer++; \
|
||||
tDecoderClear(&decoder); \
|
||||
continue; \
|
||||
} \
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
|
||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
||||
int code = 0;
|
||||
|
@ -239,6 +255,19 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
goto end;
|
||||
}
|
||||
|
||||
if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
|
||||
if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
|
||||
PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq)
|
||||
} else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
|
||||
PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq)
|
||||
} else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
|
||||
PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq)
|
||||
} else if (pHead->msgType == TDMT_VND_DELETE) {
|
||||
fetchVer++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
||||
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
|
||||
metaRsp.resMsgType = pHead->msgType;
|
||||
|
|
|
@ -2555,7 +2555,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
|||
while (1) {
|
||||
// only check here, since the iterate data in memory is very fast.
|
||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||
return pReader->code;
|
||||
}
|
||||
|
||||
|
@ -2707,7 +2707,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
|||
|
||||
while (1) {
|
||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||
return pReader->code;
|
||||
}
|
||||
|
||||
|
@ -2922,7 +2922,7 @@ static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_
|
|||
|
||||
while (1) {
|
||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||
return pReader->code;
|
||||
}
|
||||
|
||||
|
@ -2964,7 +2964,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en
|
|||
|
||||
while (1) {
|
||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr);
|
||||
return pReader->code;
|
||||
}
|
||||
|
||||
|
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "vnd.h"
|
||||
#include "tsdb.h"
|
||||
#include "vnd.h"
|
||||
|
||||
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \
|
||||
do { \
|
||||
|
@ -49,7 +49,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
// decode req
|
||||
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
goto _exit4;
|
||||
}
|
||||
|
||||
metaRsp.dbId = pVnode->config.dbId;
|
||||
|
@ -59,7 +59,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName);
|
||||
code = vnodeValidateTableHash(pVnode, tableFName);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
goto _exit4;
|
||||
}
|
||||
|
||||
// query meta
|
||||
|
@ -67,7 +67,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
|
||||
if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
goto _exit3;
|
||||
}
|
||||
|
||||
metaRsp.tableType = mer1.me.type;
|
||||
|
@ -81,7 +81,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
metaRsp.suid = mer1.me.uid;
|
||||
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
|
||||
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
|
||||
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
|
||||
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2;
|
||||
|
||||
strcpy(metaRsp.stbName, mer2.me.name);
|
||||
metaRsp.suid = mer2.me.uid;
|
||||
|
@ -125,6 +125,12 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
|
||||
|
||||
_exit:
|
||||
taosMemoryFree(metaRsp.pSchemas);
|
||||
_exit2:
|
||||
metaReaderClear(&mer2);
|
||||
_exit3:
|
||||
metaReaderClear(&mer1);
|
||||
_exit4:
|
||||
rpcMsg.info = pMsg->info;
|
||||
rpcMsg.pCont = pRsp;
|
||||
rpcMsg.contLen = rspLen;
|
||||
|
@ -141,9 +147,6 @@ _exit:
|
|||
*pMsg = rpcMsg;
|
||||
}
|
||||
|
||||
taosMemoryFree(metaRsp.pSchemas);
|
||||
metaReaderClear(&mer2);
|
||||
metaReaderClear(&mer1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -706,5 +709,5 @@ void *vnodeGetIvtIdx(void *pVnode) {
|
|||
}
|
||||
|
||||
int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) {
|
||||
return tsdbGetTableSchema(((SVnode*)pVnode)->pMeta, uid, pSchema, suid);
|
||||
return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid);
|
||||
}
|
||||
|
|
|
@ -3424,12 +3424,6 @@ _error:
|
|||
// table merge scan operator
|
||||
|
||||
// table merge scan operator
|
||||
// TODO: limit / duration optimization
|
||||
// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish
|
||||
// TODO: error processing, memory freeing
|
||||
// TODO: add log for error and perf
|
||||
// TODO: tsdb reader open/close dynamically
|
||||
// TODO: blockdata deep cleanup
|
||||
|
||||
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
||||
int32_t left = *(int32_t*)pLeft;
|
||||
|
|
|
@ -99,6 +99,9 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
|||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
}
|
||||
}
|
||||
if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
|
||||
pBuffInfo->rebuildWindow = true;
|
||||
}
|
||||
} else {
|
||||
code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(
|
||||
pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
|
@ -115,8 +118,16 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows,
|
||||
SSHashObj* pStDeleted, bool* pRebuild) {
|
||||
static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
|
||||
SSessionKey key = {0};
|
||||
getSessionHashKey(pKey, &key);
|
||||
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
|
||||
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
|
||||
}
|
||||
|
||||
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
|
||||
int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
|
||||
SSHashObj* pStDeleted, bool* pRebuild) {
|
||||
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
|
||||
int32_t num = 0;
|
||||
for (int32_t i = start; i < rows; i++) {
|
||||
|
@ -148,6 +159,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI
|
|||
|
||||
if (needDelState) {
|
||||
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
|
||||
removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
|
||||
if (pWinInfo->winInfo.pStatePos->needFree) {
|
||||
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
|
||||
}
|
||||
|
@ -242,7 +254,8 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
|
||||
slidingRows = *curWin.pWindowCount;
|
||||
if (!buffInfo.rebuildWindow) {
|
||||
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStDeleted, &buffInfo.rebuildWindow);
|
||||
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated,
|
||||
pStDeleted, &buffInfo.rebuildWindow);
|
||||
}
|
||||
if (buffInfo.rebuildWindow) {
|
||||
SSessionKey range = {0};
|
||||
|
|
|
@ -1125,8 +1125,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
||||
if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
||||
(pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
||||
continue;
|
||||
}
|
||||
if (bExtractedBlock) {
|
||||
blockDataDestroy(pBlk);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlk != NULL) {
|
||||
|
@ -1149,10 +1152,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
tSimpleHashClear(mUidBlk);
|
||||
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tSimpleHashCleanup(mUidBlk);
|
||||
taosArrayDestroy(aBlkSort);
|
||||
taosArrayDestroy(aExtSrc);
|
||||
return code;
|
||||
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
|
||||
blockDataDestroy(taosArrayGetP(aBlkSort, i));
|
||||
}
|
||||
taosArrayClear(aBlkSort);
|
||||
break;
|
||||
}
|
||||
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
|
@ -1165,6 +1169,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
szSort = 0;
|
||||
qDebug("source %zu created", taosArrayGetSize(aExtSrc));
|
||||
}
|
||||
|
||||
if (pBlk == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -1180,6 +1185,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
tSimpleHashCleanup(mUidBlk);
|
||||
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
|
||||
blockDataDestroy(taosArrayGetP(aBlkSort, i));
|
||||
}
|
||||
taosArrayDestroy(aBlkSort);
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
if (!tsortIsClosed(pHandle)) {
|
||||
|
@ -1188,7 +1196,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
taosArrayDestroy(aExtSrc);
|
||||
tSimpleHashCleanup(mTableNumRows);
|
||||
pHandle->type = SORT_SINGLESOURCE_SORT;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||
|
|
|
@ -175,4 +175,8 @@ IF(NOT TD_DARWIN)
|
|||
NAME idxFstUT
|
||||
COMMAND idxFstUT
|
||||
)
|
||||
add_test(
|
||||
NAME idxFstTest
|
||||
COMMAND idxFstTest
|
||||
)
|
||||
ENDIF ()
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
|
||||
#include <gtest/gtest.h>
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
|
@ -14,6 +15,12 @@
|
|||
#include "tutil.h"
|
||||
void* callback(void* s) { return s; }
|
||||
|
||||
class FstEnv : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {}
|
||||
virtual void TearDown() {}
|
||||
};
|
||||
|
||||
static std::string fileName = TD_TMP_DIR_PATH "tindex.tindex";
|
||||
class FstWriter {
|
||||
public:
|
||||
|
@ -154,7 +161,7 @@ class FstReadMemory {
|
|||
int32_t _size;
|
||||
};
|
||||
|
||||
#define L 100
|
||||
#define L 200
|
||||
#define M 100
|
||||
#define N 100
|
||||
|
||||
|
@ -200,7 +207,7 @@ void checkMillonWriteAndReadOfFst() {
|
|||
FstWriter* fw = new FstWriter;
|
||||
Performance_fstWriteRecords(fw);
|
||||
delete fw;
|
||||
FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024);
|
||||
FstReadMemory* fr = new FstReadMemory(1024 * 8 * 1024);
|
||||
|
||||
if (fr->init()) {
|
||||
printf("success to init fst read");
|
||||
|
@ -637,23 +644,31 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
|
|||
tfileIteratorDestroy(iter);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
// tool to check all kind of fst test
|
||||
// if (argc > 1) { validateTFile(argv[1]); }
|
||||
// if (argc > 4) {
|
||||
// path suid colName ver
|
||||
// iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
|
||||
//}
|
||||
checkFstCheckIterator1();
|
||||
// checkFstCheckIterator2();
|
||||
// checkFstCheckIteratorPrefix();
|
||||
// checkFstCheckIteratorRange1();
|
||||
// checkFstCheckIteratorRange2();
|
||||
// checkFstCheckIteratorRange3();
|
||||
// checkFstLongTerm();
|
||||
// checkFstPrefixSearch();
|
||||
// int main(int argc, char* argv[]) {
|
||||
// // tool to check all kind of fst test
|
||||
// // if (argc > 1) { validateTFile(argv[1]); }
|
||||
// // if (argc > 4) {
|
||||
// // path suid colName ver
|
||||
// // iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
|
||||
// //}
|
||||
// checkFstCheckIterator1();
|
||||
// // checkFstCheckIterator2();
|
||||
// // checkFstCheckIteratorPrefix();
|
||||
// // checkFstCheckIteratorRange1();
|
||||
// // checkFstCheckIteratorRange2();
|
||||
// // checkFstCheckIteratorRange3();
|
||||
// // checkFstLongTerm();
|
||||
// // checkFstPrefixSearch();
|
||||
|
||||
// checkMillonWriteAndReadOfFst();
|
||||
// // checkMillonWriteAndReadOfFst();
|
||||
|
||||
return 1;
|
||||
}
|
||||
// return 1;
|
||||
// }
|
||||
TEST_F(FstEnv, checkIterator1) { checkFstCheckIterator1(); }
|
||||
TEST_F(FstEnv, checkItertor2) { checkFstCheckIterator2(); }
|
||||
TEST_F(FstEnv, checkPrefix) { checkFstCheckIteratorPrefix(); }
|
||||
TEST_F(FstEnv, checkRange1) { checkFstCheckIteratorRange1(); }
|
||||
TEST_F(FstEnv, checkRange2) { checkFstCheckIteratorRange2(); }
|
||||
TEST_F(FstEnv, checkRange3) { checkFstCheckIteratorRange3(); }
|
||||
TEST_F(FstEnv, checkLongTerm) { checkFstLongTerm(); }
|
||||
TEST_F(FstEnv, checkMillonWriteData) { checkMillonWriteAndReadOfFst(); }
|
||||
|
|
|
@ -698,7 +698,7 @@ static const char* jkScanLogicPlanTagCond = "TagCond";
|
|||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
||||
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
||||
static const char* jkScanLogicPlanparaTablesSort = "paraTablesSort";
|
||||
static const char* jkScanLogicPlanParaTablesSort = "ParaTablesSort";
|
||||
|
||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||
|
@ -747,7 +747,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->paraTablesSort);
|
||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanParaTablesSort, pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -800,7 +800,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
|||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->paraTablesSort);
|
||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1895,7 +1895,7 @@ static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
|
|||
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
||||
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
||||
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
||||
static const char* jkTableScanPhysiPlanparaTablesSort = "paraTablesSort";
|
||||
static const char* jkTableScanPhysiPlanParaTablesSort = "ParaTablesSort";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
@ -1971,7 +1971,7 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanparaTablesSort, pNode->paraTablesSort);
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanParaTablesSort, pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -2050,7 +2050,7 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanparaTablesSort, &pNode->paraTablesSort);
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanParaTablesSort, &pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -285,7 +285,6 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
|
|||
pTmp->suid = pSrc->suid;
|
||||
pTmp->uid = pSrc->uid;
|
||||
pTmp->sver = pSrc->sver;
|
||||
pTmp->source = pSrc->source;
|
||||
pTmp->pCreateTbReq = NULL;
|
||||
if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||
if (pSrc->pCreateTbReq) {
|
||||
|
@ -654,7 +653,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
goto end;
|
||||
}
|
||||
|
||||
pTableCxt->pData->source = SOURCE_TAOSX;
|
||||
pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
|
||||
if(tmp == NULL){
|
||||
ret = initTableColSubmitData(pTableCxt);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -722,7 +721,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
goto end;
|
||||
}
|
||||
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||
if (needChangeLength && version == 1) {
|
||||
if (needChangeLength && version == BLOCK_VERSION_1) {
|
||||
pStart += htonl(colLength[j]);
|
||||
} else {
|
||||
pStart += colLength[j];
|
||||
|
@ -753,7 +752,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
goto end;
|
||||
}
|
||||
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||
if (needChangeLength && version == 1) {
|
||||
if (needChangeLength && version == BLOCK_VERSION_1) {
|
||||
pStart += htonl(colLength[i]);
|
||||
} else {
|
||||
pStart += colLength[i];
|
||||
|
|
|
@ -488,6 +488,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
|
|||
void* pFileStore = getStateFileStore(pFileState);
|
||||
SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
|
||||
if (pCur) {
|
||||
pCur->pStreamFileState = pFileState;
|
||||
SSessionKey key = {0};
|
||||
void* pVal = NULL;
|
||||
int len = 0;
|
||||
|
@ -736,6 +737,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
|||
void* pRockVal = NULL;
|
||||
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
|
||||
streamStateFreeCur(pCur);
|
||||
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
||||
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -743,7 +745,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
|||
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
|
||||
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
|
||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
|
||||
streamStateFreeCur(pCur);
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
@ -751,7 +752,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
|||
pWinKey->win.ekey = endTs;
|
||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
|
||||
taosMemoryFree(pRockVal);
|
||||
streamStateFreeCur(pCur);
|
||||
} else {
|
||||
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);
|
||||
code = TSDB_CODE_FAILED;
|
||||
|
|
|
@ -1329,7 +1329,6 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
|||
char *data = taosMemoryMalloc(compressSize);
|
||||
gzFile dstFp = NULL;
|
||||
|
||||
TdFilePtr pFile = NULL;
|
||||
TdFilePtr pSrcFile = NULL;
|
||||
|
||||
pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
|
||||
|
@ -1369,8 +1368,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
|||
}
|
||||
|
||||
cmp_end:
|
||||
if (pFile) {
|
||||
taosCloseFile(&pFile);
|
||||
if (fd >= 0) {
|
||||
close(fd);
|
||||
}
|
||||
if (pSrcFile) {
|
||||
taosCloseFile(&pSrcFile);
|
||||
|
|
|
@ -18,24 +18,29 @@
|
|||
#include <math.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
#define BASE_BUF_SIZE 256
|
||||
#define TBASE_BUF_SIZE 256
|
||||
static const char *basis_58 = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz";
|
||||
|
||||
char *base58_encode(const uint8_t *value, int32_t vlen) {
|
||||
const uint8_t *pb = value;
|
||||
const uint8_t *pe = pb + vlen;
|
||||
uint8_t buf[BASE_BUF_SIZE] = {0};
|
||||
uint8_t buf[TBASE_BUF_SIZE] = {0};
|
||||
uint8_t *pbuf = &buf[0];
|
||||
bool bfree = false;
|
||||
int32_t nz = 0, size = 0, len = 0;
|
||||
|
||||
if (vlen > TBASE_MAX_ILEN) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
while (pb != pe && *pb == 0) {
|
||||
++pb;
|
||||
++nz;
|
||||
}
|
||||
|
||||
size = (pe - pb) * 69 / 50 + 1;
|
||||
if (size > BASE_BUF_SIZE) {
|
||||
if (size > TBASE_BUF_SIZE) {
|
||||
if (!(pbuf = taosMemoryCalloc(1, size))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -47,7 +52,7 @@ char *base58_encode(const uint8_t *value, int32_t vlen) {
|
|||
int32_t num = *pb;
|
||||
int32_t i = 0;
|
||||
for (int32_t j = (int32_t)size - 1; (num != 0 || i < len) && j >= 0; --j, ++i) {
|
||||
num += ((int32_t)buf[j]) << 8;
|
||||
num += ((int32_t)pbuf[j]) << 8;
|
||||
pbuf[j] = num % 58;
|
||||
num /= 58;
|
||||
}
|
||||
|
@ -57,7 +62,7 @@ char *base58_encode(const uint8_t *value, int32_t vlen) {
|
|||
|
||||
const uint8_t *pi = pbuf + (size - len);
|
||||
while (pi != pbuf + size && *pi == 0) ++pi;
|
||||
uint8_t *result = taosMemoryCalloc(1, size + 1);
|
||||
uint8_t *result = taosMemoryCalloc(1, nz + (pbuf + size - pi) + 1);
|
||||
if (!result) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (bfree) taosMemoryFree(pbuf);
|
||||
|
@ -82,20 +87,35 @@ static const signed char index_58[256] = {
|
|||
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
|
||||
|
||||
uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) {
|
||||
const char *pb = value;
|
||||
const char *pe = value + inlen;
|
||||
uint8_t buf[BASE_BUF_SIZE] = {0};
|
||||
uint8_t buf[TBASE_BUF_SIZE] = {0};
|
||||
uint8_t *pbuf = &buf[0];
|
||||
bool bfree = false;
|
||||
int32_t nz = 0, size = 0, len = 0;
|
||||
|
||||
while (*value && isspace(*value)) ++value;
|
||||
while (*value == '1') {
|
||||
++nz;
|
||||
++value;
|
||||
if (inlen > TBASE_MAX_OLEN) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size = (int32_t)(pe - value) * 733 / 1000 + 1;
|
||||
if (size > BASE_BUF_SIZE) {
|
||||
while (pb != pe) {
|
||||
if (*pb == 0) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return NULL;
|
||||
}
|
||||
++pb;
|
||||
}
|
||||
|
||||
pb = value;
|
||||
while (pb != pe && *pb && isspace(*pb)) ++pb;
|
||||
while (pb != pe && *pb == '1') {
|
||||
++nz;
|
||||
++pb;
|
||||
}
|
||||
|
||||
size = (int32_t)(pe - pb) * 733 / 1000 + 1;
|
||||
if (size > TBASE_BUF_SIZE) {
|
||||
if (!(pbuf = taosMemoryCalloc(1, size))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -103,9 +123,10 @@ uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) {
|
|||
bfree = true;
|
||||
}
|
||||
|
||||
while (*value && !isspace(*value)) {
|
||||
int32_t num = index_58[(uint8_t)*value];
|
||||
while (pb != pe && *pb && !isspace(*pb)) {
|
||||
int32_t num = index_58[(uint8_t)*pb];
|
||||
if (num == -1) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
if (bfree) taosMemoryFree(pbuf);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -116,18 +137,18 @@ uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) {
|
|||
num >>= 8;
|
||||
}
|
||||
len = i;
|
||||
++value;
|
||||
++pb;
|
||||
}
|
||||
|
||||
while (isspace(*value)) ++value;
|
||||
if (*value != 0) {
|
||||
while (pb != pe && isspace(*pb)) ++pb;
|
||||
if (*pb != 0) {
|
||||
if (bfree) taosMemoryFree(pbuf);
|
||||
return NULL;
|
||||
}
|
||||
const uint8_t *it = pbuf + (size - len);
|
||||
while (it != pbuf + size && *it == 0) ++it;
|
||||
|
||||
uint8_t *result = taosMemoryCalloc(1, size + 1);
|
||||
uint8_t *result = taosMemoryCalloc(1, nz + (pbuf + size - it) + 1);
|
||||
if (!result) {
|
||||
if (bfree) taosMemoryFree(pbuf);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
#include "tutil.h"
|
||||
|
||||
static int32_t initForwardBackwardPtr(SSkipList *pSkipList);
|
||||
static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur);
|
||||
static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur);
|
||||
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
|
||||
static void tSkipListCorrectLevel(SSkipList *pSkipList);
|
||||
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
|
||||
|
@ -131,12 +131,14 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
|
|||
return pNode;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
|
||||
void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate) {
|
||||
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
bool hasDup = false;
|
||||
char *pKey = NULL;
|
||||
char *pDataKey = NULL;
|
||||
char * pKey = NULL;
|
||||
char * pDataKey = NULL;
|
||||
int32_t compare = 0;
|
||||
|
||||
tSkipListWLock(pSkipList);
|
||||
|
@ -260,6 +262,7 @@ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
|
|||
tSkipListCorrectLevel(pSkipList);
|
||||
tSkipListUnlock(pSkipList);
|
||||
}
|
||||
#endif
|
||||
|
||||
SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) {
|
||||
if (pSkipList == NULL) return NULL;
|
||||
|
@ -350,6 +353,7 @@ void *tSkipListDestroyIter(SSkipListIterator *iter) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
|
||||
if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) {
|
||||
return;
|
||||
|
@ -358,7 +362,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
|
|||
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
|
||||
|
||||
int32_t id = 1;
|
||||
char *prev = NULL;
|
||||
char * prev = NULL;
|
||||
|
||||
while (p != pSkipList->pTail) {
|
||||
char *key = SL_GET_NODE_KEY(pSkipList, p);
|
||||
|
@ -392,6 +396,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
|
|||
p = SL_NODE_GET_FORWARD_POINTER(p, nlevel - 1);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) {
|
||||
for (int32_t i = 0; i < pNode->level; ++i) {
|
||||
|
@ -460,7 +465,7 @@ static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList) {
|
|||
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) {
|
||||
int32_t compare = 0;
|
||||
bool hasDupKey = false;
|
||||
char *pDataKey = pSkipList->keyFn(pData);
|
||||
char * pDataKey = pSkipList->keyFn(pData);
|
||||
|
||||
if (pSkipList->size == 0) {
|
||||
for (int32_t i = 0; i < pSkipList->maxLevel; i++) {
|
||||
|
@ -516,6 +521,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
|
|||
return hasDupKey;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) {
|
||||
int32_t level = pNode->level;
|
||||
uint8_t dupMode = SL_DUP_MODE(pSkipList);
|
||||
|
@ -540,6 +546,7 @@ static void tSkipListCorrectLevel(SSkipList *pSkipList) {
|
|||
pSkipList->level -= 1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList,
|
||||
int32_t level) { // record link count in each level
|
||||
|
|
|
@ -101,6 +101,22 @@ add_test(
|
|||
COMMAND talgoTest
|
||||
)
|
||||
|
||||
# tbaseCodecTest
|
||||
add_executable(tbaseCodecTest "tbaseCodecTest.cpp")
|
||||
target_link_libraries(tbaseCodecTest os util common gtest_main)
|
||||
add_test(
|
||||
NAME tbaseCodecTest
|
||||
COMMAND tbaseCodecTest
|
||||
)
|
||||
|
||||
# tbaseCodecTest
|
||||
add_executable(tbaseCodecTest "tbaseCodecTest.cpp")
|
||||
target_link_libraries(tbaseCodecTest os util common gtest_main)
|
||||
add_test(
|
||||
NAME tbaseCodecTest
|
||||
COMMAND tbaseCodecTest
|
||||
)
|
||||
|
||||
# bufferTest
|
||||
add_executable(bufferTest "bufferTest.cpp")
|
||||
target_link_libraries(bufferTest os util gtest_main)
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <cassert>
|
||||
|
||||
#include <iostream>
|
||||
#include "os.h"
|
||||
#include "osTime.h"
|
||||
#include "taos.h"
|
||||
#include "taoserror.h"
|
||||
#include "tbase58.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
||||
static void checkBase58Codec(uint8_t *pRaw, int32_t rawLen, int32_t index) {
|
||||
int64_t start = taosGetTimestampUs();
|
||||
char *pEnc = base58_encode((const uint8_t *)pRaw, rawLen);
|
||||
ASSERT_NE(nullptr, pEnc);
|
||||
|
||||
int32_t encLen = strlen(pEnc);
|
||||
int64_t endOfEnc = taosGetTimestampUs();
|
||||
std::cout << "index:" << index << ", encLen is " << encLen << ", cost:" << endOfEnc - start << " us" << std::endl;
|
||||
int32_t decLen = 0;
|
||||
char *pDec = (char *)base58_decode((const char *)pEnc, encLen, &decLen);
|
||||
std::cout << "index:" << index << ", decLen is " << decLen << ", cost:" << taosGetTimestampUs() - endOfEnc << " us"
|
||||
<< std::endl;
|
||||
ASSERT_NE(nullptr, pDec);
|
||||
ASSERT_EQ(rawLen, decLen);
|
||||
ASSERT_LE(rawLen, encLen);
|
||||
ASSERT_EQ(0, strncmp((char *)pRaw, pDec, rawLen));
|
||||
taosMemoryFreeClear(pDec);
|
||||
taosMemoryFreeClear(pEnc);
|
||||
}
|
||||
|
||||
TEST(TD_BASE_CODEC_TEST, tbase58_test) {
|
||||
const int32_t TEST_LEN_MAX = TBASE_MAX_ILEN;
|
||||
const int32_t TEST_LEN_STEP = 10;
|
||||
int32_t rawLen = 0;
|
||||
uint8_t *pRaw = NULL;
|
||||
|
||||
pRaw = (uint8_t *)taosMemoryCalloc(1, TEST_LEN_MAX);
|
||||
ASSERT_NE(nullptr, pRaw);
|
||||
|
||||
// 1. normal case
|
||||
// string blend with char and '\0'
|
||||
rawLen = TEST_LEN_MAX;
|
||||
for (int32_t i = 0; i < TEST_LEN_MAX; i += 500) {
|
||||
checkBase58Codec(pRaw, rawLen, i);
|
||||
pRaw[i] = i & 127;
|
||||
}
|
||||
|
||||
// string without '\0'
|
||||
for (int32_t i = 0; i < TEST_LEN_MAX; ++i) {
|
||||
pRaw[i] = i & 127;
|
||||
}
|
||||
checkBase58Codec(pRaw, TEST_LEN_MAX, 0);
|
||||
for (int32_t i = 0; i < TEST_LEN_MAX; i += 500) {
|
||||
rawLen = i;
|
||||
checkBase58Codec(pRaw, rawLen, i);
|
||||
}
|
||||
taosMemoryFreeClear(pRaw);
|
||||
ASSERT_EQ(nullptr, pRaw);
|
||||
|
||||
// 2. overflow case
|
||||
char tmp[1];
|
||||
char *pEnc = base58_encode((const uint8_t *)tmp, TBASE_MAX_ILEN + 1);
|
||||
ASSERT_EQ(nullptr, pEnc);
|
||||
char *pDec = (char *)base58_decode((const char *)tmp, TBASE_MAX_OLEN + 1, NULL);
|
||||
ASSERT_EQ(nullptr, pDec);
|
||||
|
||||
taosMemoryFreeClear(pRaw);
|
||||
ASSERT_EQ(nullptr, pRaw);
|
||||
}
|
|
@ -25,6 +25,7 @@ from frame.cases import *
|
|||
from frame.sql import *
|
||||
from frame.caseBase import *
|
||||
from frame import *
|
||||
from frame.srvCtl import *
|
||||
|
||||
|
||||
class TDTestCase(TBase):
|
||||
|
@ -65,6 +66,21 @@ class TDTestCase(TBase):
|
|||
sql = f"select avg(dc) from {self.db}.{self.stb}"
|
||||
tdSql.checkFirstValue(sql, 200)
|
||||
|
||||
def alterReplica3(self):
|
||||
sql = f"alter database {self.db} replica 3"
|
||||
tdSql.execute(sql, show=True)
|
||||
time.sleep(2)
|
||||
sc.dnodeStop(2)
|
||||
sc.dnodeStop(3)
|
||||
time.sleep(5)
|
||||
sc.dnodeStart(2)
|
||||
sc.dnodeStart(3)
|
||||
|
||||
if self.waitTransactionZero() is False:
|
||||
tdLog.exit(f"{sql} transaction not finished")
|
||||
return False
|
||||
return True
|
||||
|
||||
def doAction(self):
|
||||
tdLog.info(f"do action.")
|
||||
self.flushDb()
|
||||
|
@ -81,7 +97,7 @@ class TDTestCase(TBase):
|
|||
self.alterReplica(1)
|
||||
self.checkAggCorrect()
|
||||
self.compactDb()
|
||||
self.alterReplica(3)
|
||||
self.alterReplica3()
|
||||
|
||||
vgids = self.getVGroup(self.db)
|
||||
selid = random.choice(vgids)
|
||||
|
|
|
@ -28,12 +28,12 @@ from frame import *
|
|||
from frame.eos import *
|
||||
|
||||
#
|
||||
# 192.168.1.52 MINIO S3 API KEY: MQCEIoaPGUs1mhXgpUAu:XTgpN2dEMInnYgqN4gj3G5zgb39ROtsisKKy0GFa
|
||||
# 192.168.1.52 MINIO S3
|
||||
#
|
||||
|
||||
'''
|
||||
s3EndPoint http://192.168.1.52:9000
|
||||
s3AccessKey MQCEIoaPGUs1mhXgpUAu:XTgpN2dEMInnYgqN4gj3G5zgb39ROtsisKKy0GFa
|
||||
s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX'
|
||||
s3BucketName ci-bucket
|
||||
s3UploadDelaySec 60
|
||||
'''
|
||||
|
@ -42,7 +42,7 @@ s3UploadDelaySec 60
|
|||
class TDTestCase(TBase):
|
||||
updatecfgDict = {
|
||||
's3EndPoint': 'http://192.168.1.52:9000',
|
||||
's3AccessKey': 'MQCEIoaPGUs1mhXgpUAu:XTgpN2dEMInnYgqN4gj3G5zgb39ROtsisKKy0GFa',
|
||||
's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX',
|
||||
's3BucketName': 'ci-bucket',
|
||||
's3BlockSize': '10240',
|
||||
's3BlockCacheSize': '320',
|
||||
|
@ -78,14 +78,27 @@ class TDTestCase(TBase):
|
|||
self.trimDb(True)
|
||||
|
||||
rootPath = sc.clusterRootPath()
|
||||
cmd = f"ls {rootPath}/dnode1/data20/vnode/vnode*/tsdb/*.data"
|
||||
cmd = f"ls {rootPath}/dnode1/data2*/vnode/vnode*/tsdb/*.data"
|
||||
tdLog.info(cmd)
|
||||
loop = 0
|
||||
while len(eos.runRetList(cmd)) > 0 and loop < 40:
|
||||
time.sleep(5)
|
||||
rets = []
|
||||
while loop < 180:
|
||||
time.sleep(3)
|
||||
rets = eos.runRetList(cmd)
|
||||
cnt = len(rets)
|
||||
if cnt == 0:
|
||||
tdLog.info("All data file upload to server over.")
|
||||
break
|
||||
self.trimDb(True)
|
||||
tdLog.info(f"loop={loop} no upload {cnt} data files wait 3s retry ...")
|
||||
if loop == 0:
|
||||
sc.dnodeStop(1)
|
||||
time.sleep(2)
|
||||
sc.dnodeStart(1)
|
||||
loop += 1
|
||||
tdLog.info(f"loop={loop} wait 5s...")
|
||||
|
||||
if len(rets) > 0:
|
||||
tdLog.exit(f"s3 can not upload all data to server. data files cnt={len(rets)} list={rets}")
|
||||
|
||||
def checkStreamCorrect(self):
|
||||
sql = f"select count(*) from {self.db}.stm1"
|
||||
|
|
|
@ -33,14 +33,14 @@ class srvCtl:
|
|||
# control server
|
||||
#
|
||||
|
||||
# start
|
||||
# start idx base is 1
|
||||
def dnodeStart(self, idx):
|
||||
if clusterDnodes.getModel() == 'cluster':
|
||||
return clusterDnodes.starttaosd(idx)
|
||||
|
||||
return tdDnodes.starttaosd(idx)
|
||||
|
||||
# stop
|
||||
# stop idx base is 1
|
||||
def dnodeStop(self, idx):
|
||||
if clusterDnodes.getModel() == 'cluster':
|
||||
return clusterDnodes.stoptaosd(idx)
|
||||
|
|
|
@ -49,6 +49,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tms_memleak.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
import sys
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.dnodes import tdDnodes
|
||||
from math import inf
|
||||
|
||||
class TDTestCase:
|
||||
def caseDescription(self):
|
||||
'''
|
||||
case1<shenglian zhou>: [TD-]
|
||||
'''
|
||||
return
|
||||
|
||||
def init(self, conn, logSql, replicaVer=1):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), True)
|
||||
self.conn = conn
|
||||
|
||||
def restartTaosd(self, index=1, dbname="db"):
|
||||
tdDnodes.stop(index)
|
||||
tdDnodes.startWithoutSleep(index)
|
||||
tdSql.execute(f"use tms_memleak")
|
||||
|
||||
def run(self):
|
||||
print("running {}".format(__file__))
|
||||
tdSql.execute("drop database if exists tms_memleak")
|
||||
tdSql.execute("create database if not exists tms_memleak")
|
||||
tdSql.execute('use tms_memleak')
|
||||
|
||||
tdSql.execute('create table st(ts timestamp, f int) tags (t int);')
|
||||
|
||||
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 1)('2021-04-19 00:00:02', 2)('2021-04-19 00:00:03', 3)('2021-04-19 00:00:04', 4)")
|
||||
|
||||
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-20 00:00:01', 5)('2021-04-20 00:00:02', 6)('2021-04-20 00:00:03', 7)('2021-04-20 00:00:04', 8)")
|
||||
|
||||
tdSql.execute("insert into ct3 using st tags(3) values('2021-04-21 00:00:01', 5)('2021-04-21 00:00:02', 6)('2021-04-21 00:00:03', 7)('2021-04-21 00:00:04', 8)")
|
||||
|
||||
tdSql.execute("insert into ct4 using st tags(4) values('2021-04-22 00:00:01', 5)('2021-04-22 00:00:02', 6)('2021-04-22 00:00:03', 7)('2021-04-22 00:00:04', 8)")
|
||||
|
||||
tdSql.query("select * from st order by ts limit 1 ");
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 1);
|
||||
|
||||
tdSql.execute('drop database tms_memleak')
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -323,7 +323,7 @@ class TDTestCase:
|
|||
tdSql.query("select * from st")
|
||||
tdSql.checkRows(8)
|
||||
|
||||
tdSql.execute(f'create topic topic_excluded with meta as database d1')
|
||||
tdSql.execute(f'create topic topic_all with meta as database d1')
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
|
@ -333,7 +333,7 @@ class TDTestCase:
|
|||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["topic_excluded"])
|
||||
consumer.subscribe(["topic_all"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include "types.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
static int running = 1;
|
||||
TdFilePtr g_fp = NULL;
|
||||
|
@ -966,7 +967,14 @@ void testConsumeExcluded(int topic_type){
|
|||
tmq_raw_data raw = {0};
|
||||
tmq_get_raw(msg, &raw);
|
||||
if(topic_type == 1){
|
||||
assert(raw.raw_type != 2 && raw.raw_type != 4);
|
||||
assert(raw.raw_type != 2 && raw.raw_type != 4 &&
|
||||
raw.raw_type != TDMT_VND_CREATE_STB &&
|
||||
raw.raw_type != TDMT_VND_ALTER_STB &&
|
||||
raw.raw_type != TDMT_VND_CREATE_TABLE &&
|
||||
raw.raw_type != TDMT_VND_ALTER_TABLE &&
|
||||
raw.raw_type != TDMT_VND_DELETE);
|
||||
assert(raw.raw_type == TDMT_VND_DROP_STB ||
|
||||
raw.raw_type == TDMT_VND_DROP_TABLE);
|
||||
}else if(topic_type == 2){
|
||||
assert(0);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue