Merge pull request #19923 from taosdata/fix/3.0_merge_main

merge main
This commit is contained in:
Xiaoyu Wang 2023-02-13 09:48:45 +08:00 committed by GitHub
commit 6483b92f0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 3292 additions and 2046 deletions

View File

@ -14,6 +14,12 @@
[![Build status](https://ci.appveyor.com/api/projects/status/kf3pwh2or5afsgl9/branch/master?svg=true)](https://ci.appveyor.com/project/sangshuduo/tdengine-2n8ge/branch/master)
[![Coverage Status](https://coveralls.io/repos/github/taosdata/TDengine/badge.svg?branch=develop)](https://coveralls.io/github/taosdata/TDengine?branch=develop)
[![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/4201/badge)](https://bestpractices.coreinfrastructure.org/projects/4201)
<br />
[![Twitter Follow](https://img.shields.io/twitter/follow/tdenginedb?label=TDengine&style=social)](https://twitter.com/tdenginedb)
[![YouTube Channel](https://img.shields.io/badge/Subscribe_@tdengine--white?logo=youtube&style=social)](https://www.youtube.com/@tdengine)
[![Discord Community](https://img.shields.io/badge/Join_Discord--white?logo=discord&style=social)](https://discord.com/invite/VZdSuUg4pS)
[![LinkedIn](https://img.shields.io/badge/Follow_LinkedIn--white?logo=linkedin&style=social)](https://www.linkedin.com/company/tdengine)
[![StackOverflow](https://img.shields.io/badge/Ask_StackOverflow--white?logo=stackoverflow&style=social&logoColor=orange)](https://stackoverflow.com/questions/tagged/tdengine)
English | [简体中文](README-CN.md) | [TDengine Cloud](https://cloud.tdengine.com) | [Learn more about TSDB](https://tdengine.com/tsdb/)

View File

@ -1282,8 +1282,8 @@ int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeRe
int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
typedef struct {
int32_t vgId;
int8_t disable;
int32_t vgId;
int8_t disable;
} SDisableVnodeWriteReq;
int32_t tSerializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq);
@ -1783,6 +1783,7 @@ typedef struct {
#define STREAM_FILL_HISTORY_ON 1
#define STREAM_FILL_HISTORY_OFF 0
#define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF
#define STREAM_DEFAULT_IGNORE_UPDATE 0
#define STREAM_CREATE_STABLE_TRUE 1
#define STREAM_CREATE_STABLE_FALSE 0
@ -1811,7 +1812,8 @@ typedef struct {
// 3.0.2.3
int8_t createStb;
uint64_t targetStbUid;
SArray* fillNullCols; // array of SColLocation
SArray* fillNullCols; // array of SColLocation
int8_t igUpdate;
} SCMCreateStreamReq;
typedef struct {

View File

@ -213,132 +213,132 @@
#define TK_IGNORE 195
#define TK_EXPIRED 196
#define TK_FILL_HISTORY 197
#define TK_SUBTABLE 198
#define TK_KILL 199
#define TK_CONNECTION 200
#define TK_TRANSACTION 201
#define TK_BALANCE 202
#define TK_VGROUP 203
#define TK_MERGE 204
#define TK_REDISTRIBUTE 205
#define TK_SPLIT 206
#define TK_DELETE 207
#define TK_INSERT 208
#define TK_NULL 209
#define TK_NK_QUESTION 210
#define TK_NK_ARROW 211
#define TK_ROWTS 212
#define TK_QSTART 213
#define TK_QEND 214
#define TK_QDURATION 215
#define TK_WSTART 216
#define TK_WEND 217
#define TK_WDURATION 218
#define TK_IROWTS 219
#define TK_ISFILLED 220
#define TK_CAST 221
#define TK_NOW 222
#define TK_TODAY 223
#define TK_TIMEZONE 224
#define TK_CLIENT_VERSION 225
#define TK_SERVER_VERSION 226
#define TK_SERVER_STATUS 227
#define TK_CURRENT_USER 228
#define TK_CASE 229
#define TK_END 230
#define TK_WHEN 231
#define TK_THEN 232
#define TK_ELSE 233
#define TK_BETWEEN 234
#define TK_IS 235
#define TK_NK_LT 236
#define TK_NK_GT 237
#define TK_NK_LE 238
#define TK_NK_GE 239
#define TK_NK_NE 240
#define TK_MATCH 241
#define TK_NMATCH 242
#define TK_CONTAINS 243
#define TK_IN 244
#define TK_JOIN 245
#define TK_INNER 246
#define TK_SELECT 247
#define TK_DISTINCT 248
#define TK_WHERE 249
#define TK_PARTITION 250
#define TK_BY 251
#define TK_SESSION 252
#define TK_STATE_WINDOW 253
#define TK_EVENT_WINDOW 254
#define TK_START 255
#define TK_SLIDING 256
#define TK_FILL 257
#define TK_VALUE 258
#define TK_VALUE_F 259
#define TK_NONE 260
#define TK_PREV 261
#define TK_NULL_F 262
#define TK_LINEAR 263
#define TK_NEXT 264
#define TK_HAVING 265
#define TK_RANGE 266
#define TK_EVERY 267
#define TK_ORDER 268
#define TK_SLIMIT 269
#define TK_SOFFSET 270
#define TK_LIMIT 271
#define TK_OFFSET 272
#define TK_ASC 273
#define TK_NULLS 274
#define TK_ABORT 275
#define TK_AFTER 276
#define TK_ATTACH 277
#define TK_BEFORE 278
#define TK_BEGIN 279
#define TK_BITAND 280
#define TK_BITNOT 281
#define TK_BITOR 282
#define TK_BLOCKS 283
#define TK_CHANGE 284
#define TK_COMMA 285
#define TK_CONCAT 286
#define TK_CONFLICT 287
#define TK_COPY 288
#define TK_DEFERRED 289
#define TK_DELIMITERS 290
#define TK_DETACH 291
#define TK_DIVIDE 292
#define TK_DOT 293
#define TK_EACH 294
#define TK_FAIL 295
#define TK_FILE 296
#define TK_FOR 297
#define TK_GLOB 298
#define TK_ID 299
#define TK_IMMEDIATE 300
#define TK_IMPORT 301
#define TK_INITIALLY 302
#define TK_INSTEAD 303
#define TK_ISNULL 304
#define TK_KEY 305
#define TK_MODULES 306
#define TK_NK_BITNOT 307
#define TK_NK_SEMI 308
#define TK_NOTNULL 309
#define TK_OF 310
#define TK_PLUS 311
#define TK_PRIVILEGE 312
#define TK_RAISE 313
#define TK_REPLACE 314
#define TK_RESTRICT 315
#define TK_ROW 316
#define TK_SEMI 317
#define TK_STAR 318
#define TK_STATEMENT 319
#define TK_STRICT 320
#define TK_STRING 321
#define TK_TIMES 322
#define TK_UPDATE 323
#define TK_UPDATE 198
#define TK_SUBTABLE 199
#define TK_KILL 200
#define TK_CONNECTION 201
#define TK_TRANSACTION 202
#define TK_BALANCE 203
#define TK_VGROUP 204
#define TK_MERGE 205
#define TK_REDISTRIBUTE 206
#define TK_SPLIT 207
#define TK_DELETE 208
#define TK_INSERT 209
#define TK_NULL 210
#define TK_NK_QUESTION 211
#define TK_NK_ARROW 212
#define TK_ROWTS 213
#define TK_QSTART 214
#define TK_QEND 215
#define TK_QDURATION 216
#define TK_WSTART 217
#define TK_WEND 218
#define TK_WDURATION 219
#define TK_IROWTS 220
#define TK_ISFILLED 221
#define TK_CAST 222
#define TK_NOW 223
#define TK_TODAY 224
#define TK_TIMEZONE 225
#define TK_CLIENT_VERSION 226
#define TK_SERVER_VERSION 227
#define TK_SERVER_STATUS 228
#define TK_CURRENT_USER 229
#define TK_CASE 230
#define TK_END 231
#define TK_WHEN 232
#define TK_THEN 233
#define TK_ELSE 234
#define TK_BETWEEN 235
#define TK_IS 236
#define TK_NK_LT 237
#define TK_NK_GT 238
#define TK_NK_LE 239
#define TK_NK_GE 240
#define TK_NK_NE 241
#define TK_MATCH 242
#define TK_NMATCH 243
#define TK_CONTAINS 244
#define TK_IN 245
#define TK_JOIN 246
#define TK_INNER 247
#define TK_SELECT 248
#define TK_DISTINCT 249
#define TK_WHERE 250
#define TK_PARTITION 251
#define TK_BY 252
#define TK_SESSION 253
#define TK_STATE_WINDOW 254
#define TK_EVENT_WINDOW 255
#define TK_START 256
#define TK_SLIDING 257
#define TK_FILL 258
#define TK_VALUE 259
#define TK_VALUE_F 260
#define TK_NONE 261
#define TK_PREV 262
#define TK_NULL_F 263
#define TK_LINEAR 264
#define TK_NEXT 265
#define TK_HAVING 266
#define TK_RANGE 267
#define TK_EVERY 268
#define TK_ORDER 269
#define TK_SLIMIT 270
#define TK_SOFFSET 271
#define TK_LIMIT 272
#define TK_OFFSET 273
#define TK_ASC 274
#define TK_NULLS 275
#define TK_ABORT 276
#define TK_AFTER 277
#define TK_ATTACH 278
#define TK_BEFORE 279
#define TK_BEGIN 280
#define TK_BITAND 281
#define TK_BITNOT 282
#define TK_BITOR 283
#define TK_BLOCKS 284
#define TK_CHANGE 285
#define TK_COMMA 286
#define TK_CONCAT 287
#define TK_CONFLICT 288
#define TK_COPY 289
#define TK_DEFERRED 290
#define TK_DELIMITERS 291
#define TK_DETACH 292
#define TK_DIVIDE 293
#define TK_DOT 294
#define TK_EACH 295
#define TK_FAIL 296
#define TK_FILE 297
#define TK_FOR 298
#define TK_GLOB 299
#define TK_ID 300
#define TK_IMMEDIATE 301
#define TK_IMPORT 302
#define TK_INITIALLY 303
#define TK_INSTEAD 304
#define TK_ISNULL 305
#define TK_KEY 306
#define TK_MODULES 307
#define TK_NK_BITNOT 308
#define TK_NK_SEMI 309
#define TK_NOTNULL 310
#define TK_OF 311
#define TK_PLUS 312
#define TK_PRIVILEGE 313
#define TK_RAISE 314
#define TK_REPLACE 315
#define TK_RESTRICT 316
#define TK_ROW 317
#define TK_SEMI 318
#define TK_STAR 319
#define TK_STATEMENT 320
#define TK_STRICT 321
#define TK_STRING 322
#define TK_TIMES 323
#define TK_VALUES 324
#define TK_VARIABLE 325
#define TK_VIEW 326

View File

@ -400,6 +400,7 @@ typedef struct SStreamOptions {
SNode* pDeleteMark;
int8_t fillHistory;
int8_t ignoreExpired;
int8_t ignoreUpdate;
} SStreamOptions;
typedef struct SCreateStreamStmt {

View File

@ -93,6 +93,7 @@ typedef struct SScanLogicNode {
int64_t watermark;
int64_t deleteMark;
int8_t igExpired;
int8_t igCheckUpdate;
SArray* pSmaIndexes;
SNodeList* pGroupTags;
bool groupSort;
@ -224,6 +225,7 @@ typedef struct SWindowLogicNode {
int64_t watermark;
int64_t deleteMark;
int8_t igExpired;
int8_t igCheckUpdate;
EWindowAlgorithm windowAlgo;
EOrder inputTsOrder;
EOrder outputTsOrder;
@ -364,6 +366,7 @@ typedef struct STableScanPhysiNode {
int64_t watermark;
int8_t igExpired;
bool assignBlockUid;
int8_t igCheckUpdate;
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;

View File

@ -36,6 +36,7 @@ typedef struct SPlanContext {
int64_t watermark;
int64_t deleteMark;
int8_t igExpired;
int8_t igCheckUpdate;
char* pMsg;
int32_t msgLen;
const char* pUser;

View File

@ -191,6 +191,7 @@ int32_t walApplyVer(SWal *, int64_t ver);
// read
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);

View File

@ -110,6 +110,8 @@ bool taosValidFile(TdFilePtr pFile);
int32_t taosGetErrorFile(TdFilePtr pFile);
int32_t taosCompressFile(char *srcFileName, char *destFileName);
#ifdef __cplusplus
}
#endif

View File

@ -111,6 +111,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A)
#define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B)
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) //
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) //

View File

@ -62,4 +62,4 @@ target_link_libraries(
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
endif(${BUILD_TEST})

View File

@ -25,6 +25,8 @@
#include "tref.h"
#include "ttimer.h"
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t) {
char* string = NULL;
@ -690,7 +692,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.numOfColumns = req.schemaRow.nCols;
pReq.numOfTags = req.schemaTag.nCols;
pReq.commentLen = -1;
pReq.suid = req.suid;
pReq.suid = processSuid(req.suid, pRequest->pDb);
pReq.source = TD_REQ_FROM_TAOX;
pReq.igExists = true;
@ -762,7 +764,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
// build drop stable
pReq.igNotExists = true;
pReq.source = TD_REQ_FROM_TAOX;
pReq.suid = req.suid;
pReq.suid = processSuid(req.suid, pRequest->pDb);
STscObj* pTscObj = pRequest->pTscObj;
SName tableName = {0};
@ -880,6 +882,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
if (pCreateReq->type == TSDB_CHILD_TABLE) {
STableMeta* pTableMeta = NULL;
SName sName = {0};
pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
@ -1017,6 +1020,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
pDropReq->igNotExists = true;
pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
SVgroupInfo pInfo = {0};
SName pName = {0};
@ -1506,11 +1510,11 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if(fields == NULL){
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
goto end;
}
for(int i = 0; i < pSW->nCols; i++){
for (int i = 0; i < pSW->nCols; i++) {
fields[i].type = pSW->pSchema[i].type;
fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
@ -1621,7 +1625,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
SDecoder decoderTmp = {0};
SDecoder decoderTmp = {0};
SVCreateTbReq pCreateReq = {0};
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
@ -1644,9 +1648,9 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderClear(&decoderTmp);
}
if(pCreateReqDst){
if (pCreateReqDst) {
strcpy(pName.tname, pCreateReqDst->ctb.stbName);
}else{
} else {
strcpy(pName.tname, tbName);
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
@ -1667,9 +1671,9 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
if(pCreateReqDst){
if (pCreateReqDst) {
pTableMeta->vgId = vg.vgId;
pTableMeta->uid = pCreateReqDst->uid;
pTableMeta->uid = pCreateReqDst->uid;
}
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
if (hData == NULL) {
@ -1677,11 +1681,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if(fields == NULL){
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
goto end;
}
for(int i = 0; i < pSW->nCols; i++){
for (int i = 0; i < pSW->nCols; i++) {
fields[i].type = pSW->pSchema[i].type;
fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));

View File

@ -1017,11 +1017,11 @@ void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
taosHashCleanup(kvHash);
}
// if (info->parseJsonByLib) {
// SSmlLineInfo *key = (SSmlLineInfo *)(tag->key);
// if (key != NULL) taosMemoryFree(key->tags);
// }
// taosMemoryFree(tag->key);
// if (info->parseJsonByLib) {
// SSmlLineInfo *key = (SSmlLineInfo *)(tag->key);
// if (key != NULL) taosMemoryFree(key->tags);
// }
// taosMemoryFree(tag->key);
taosArrayDestroy(tag->cols);
taosArrayDestroy(tag->tags);
taosMemoryFree(tag);
@ -1082,8 +1082,7 @@ void smlDestroyInfo(SSmlHandle *info) {
if (info->parseJsonByLib) {
taosMemoryFree(info->lines[i].tags);
}
if(info->lines[i].measureTagsLen != 0)
taosMemoryFree(info->lines[i].measureTag);
if (info->lines[i].measureTagsLen != 0) taosMemoryFree(info->lines[i].measureTag);
}
taosMemoryFree(info->lines);
}
@ -1157,9 +1156,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
elements->measureLen + elements->tagsLen);
} else {
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
elements->measureLen + elements->tagsLen);
}
if (tinfo == NULL) {
@ -1247,9 +1248,9 @@ static int32_t smlInsertData(SSmlHandle *info) {
(*pMeta)->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, (*pMeta)->tableMeta,
tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->ttl,
info->msgBuf.buf, info->msgBuf.len);
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
info->ttl, info->msgBuf.buf, info->msgBuf.len);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
return code;
@ -1371,7 +1372,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
if (info->dataFormat) {
SSmlLineInfo element = {0};
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
if(element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
} else {
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
}

View File

@ -5485,6 +5485,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1;
if (tEncodeI32(&encoder, taosArrayGetSize(pReq->fillNullCols)) < 0) return -1;
@ -5494,6 +5495,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI16(&encoder, pCol->colId) < 0) return -1;
if (tEncodeI8(&encoder, pCol->type) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
tEndEncode(&encoder);
@ -5577,6 +5579,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -650,6 +650,7 @@ typedef struct {
int64_t checkpointFreq; // ms
int64_t currentTick; // do not serialize
int64_t deleteMark;
int8_t igCheckUpdate;
} SStreamObj;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);

View File

@ -78,6 +78,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
// 3.0.20
if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
@ -145,6 +146,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
// 3.0.20
if (sver >= 2) {
if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
@ -493,7 +495,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
cnt++;
}
if(cnt != sz) return -1;
if (cnt != sz) return -1;
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
tlen += taosEncodeString(buf, pSub->dbName);
return tlen;

View File

@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory;
pObj->igCheckUpdate = pCreate->igUpdate;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
@ -379,6 +380,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
.igExpired = pObj->igExpired,
.igCheckUpdate = pObj->igCheckUpdate,
};
// using ast and param to build physical plan

View File

@ -1409,6 +1409,9 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
int32_t ret = 0;
// get super table
if (tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
metaError("vgId:%d, failed to get stable suid for update. version:%" PRId64, TD_VID(pMeta->pVnode),
pCtbEntry->version);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
ret = -1;
goto end;
}

View File

@ -1691,9 +1691,9 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
" - %" PRId64 " %s",
" - %" PRId64 ", uid:%"PRIu64", %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pReader->idStr);
pBlockScanInfo->uid, pReader->idStr);
pReader->cost.buildmemBlock += elapsedTime;
return code;
@ -1719,8 +1719,10 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return false;
}
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
SVersionRange* pVerRange) {
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1;
while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) {
@ -1729,8 +1731,15 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY k = TSDBROW_KEY(&row);
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
pVerRange)) {
if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
pScanInfo->lastKey = k.ts;
} else {
// the qualifed ts may equal to k.ts, only a greater version one.
// here we need to fallback one step.
if (pScanInfo->lastKey == k.ts) {
pScanInfo->lastKey -= step;
}
return true;
}
}
@ -2398,6 +2407,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
w.ekey = pScanInfo->lastKey + step;
}
tsdbDebug("init last block reader, window:%"PRId64"-%"PRId64", uid:%"PRIu64", %s", w.skey, w.ekey, pScanInfo->uid, pReader->idStr);
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
pLBlockReader->pInfo, false, pReader->idStr);
@ -2838,18 +2848,6 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea
taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}
// reset the last del file index
static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t order) {
void* p = taosHashIterate(pStatus->pTableMap, NULL);
while (p != NULL) {
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
// reset the last del file index
pScanInfo->lastBlockDelIndex = getInitialDelIndex(pScanInfo->delSkyline, order);
p = taosHashIterate(pStatus->pTableMap, p);
}
}
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
@ -3043,6 +3041,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
@ -3173,7 +3172,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// this file does not have data files, let's start check the last block file if exists
if (pBlockIter->numOfBlocks == 0) {
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
goto _begin;
}
}
@ -3210,7 +3208,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
tBlockDataReset(pBlockData);
resetDataBlockIterator(pBlockIter, pReader->order);
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
goto _begin;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
@ -3222,7 +3219,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// this file does not have blocks, let's start check the last block file
if (pBlockIter->numOfBlocks == 0) {
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
goto _begin;
}
}

View File

@ -426,7 +426,13 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
SVnode *pVnode = pWriter->pVnode;
ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
ASSERT(pHdr->index == pWriter->index + 1);
if (pHdr->index != pWriter->index + 1) {
vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode),
pHdr->index, pWriter->index + 1);
return -1;
}
pWriter->index = pHdr->index;
vDebug("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index,

View File

@ -479,6 +479,8 @@ typedef struct SStreamScanInfo {
SSDataBlock* pRecoverRes;
SSDataBlock* pCreateTbRes;
int8_t igCheckUpdate;
int8_t igExpired;
} SStreamScanInfo;
typedef struct {
@ -869,8 +871,8 @@ int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResul
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
int32_t order, int64_t* pData);
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
int64_t* pData);
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
@ -878,8 +880,8 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) ;
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
#ifdef __cplusplus
}

View File

@ -1830,28 +1830,30 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
int32_t* size) {
int32_t total = tableListGetOutputGroups(pTableList);
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
int32_t totalGroups = tableListGetOutputGroups(pTableList);
int32_t numOfTables = tableListGetSize(pTableList);
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
return TSDB_CODE_INVALID_PARA;
}
// here handle two special cases:
// 1. only one group exists, and 2. one table exists for each group.
if (total == 1) {
*size = tableListGetSize(pTableList);
if (totalGroups == 1) {
*size = numOfTables;
*pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
return TSDB_CODE_SUCCESS;
} else if (total == tableListGetSize(pTableList)) {
} else if (totalGroups == numOfTables) {
*size = 1;
*pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
return TSDB_CODE_SUCCESS;
}
int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
if (ordinalGroupIndex < total - 1) {
*size = pTableList->groupOffset[offset + 1] - pTableList->groupOffset[offset];
if (ordinalGroupIndex < totalGroups - 1) {
*size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
} else {
*size = total - pTableList->groupOffset[offset] - 1;
*size = numOfTables - offset;
}
*pKeyInfo = taosArrayGet(pTableList->pTableList, offset);

View File

@ -1153,7 +1153,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->partitionSup = *pParSup;
pScanInfo->pPartScalarSup = pExpr;
if (!pScanInfo->pUpdateInfo) {
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0);
}
}

View File

@ -1433,7 +1433,12 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
dumyInfo.cur.pageId = -1;
bool isClosed = false;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
bool overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
if (pInfo->igExpired && overDue) {
continue;
}
if (tableInserted && overDue) {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
}
@ -2367,6 +2372,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->partitionSup.needCalc = false;
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
pInfo->igExpired = pTableScanNode->igExpired;
pInfo->twAggSup.maxTs = INT64_MIN;
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);

View File

@ -65,8 +65,8 @@ typedef struct SSysTableScanInfo {
SSDataBlock* pRes;
int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo;
int32_t tbnameSlotId;
SLimitInfo limitInfo;
int32_t tbnameSlotId;
} SSysTableScanInfo;
typedef struct {
@ -355,9 +355,11 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo);
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
*reverse = true;
}
@ -731,7 +733,7 @@ void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfR
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false);
doFilterResult(pInfo->pRes, pFilterInfo);
doFilter(pInfo->pRes, pFilterInfo, NULL);
blockDataCleanup(dataBlock);
}
@ -1210,7 +1212,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p);
numOfRows = 0;
@ -1226,7 +1228,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p);
numOfRows = 0;
@ -1387,7 +1389,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p);
numOfRows = 0;
@ -1403,7 +1405,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p);
numOfRows = 0;
@ -1434,7 +1436,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) {
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
setOperatorCompleted(pOperator);
@ -1578,30 +1580,37 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pBlock = sysTableScanFromMNode(pOperator, pInfo, name, pTaskInfo);
}
return sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
}
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock) {
sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
if (pBlock != NULL) {
if (pInfo->tbnameSlotId != -1) {
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
memcpy(varDataVal(varTbName), name, strlen(name));
varDataSetLen(varTbName, strlen(name));
for (int i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColumnInfoData, i, varTbName, NULL);
}
doFilterResult(pBlock, pOperator->exprSupp.pFilterInfo);
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
if (limitReached) {
setOperatorCompleted(pOperator);
}
}
if (pBlock && pBlock->info.rows != 0) {
return pBlock;
return pBlock->info.rows > 0 ? pBlock : NULL;
} else {
return NULL;
}
}
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock) {
if (pBlock == NULL) {
return;
}
if (pInfo->tbnameSlotId != -1) {
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
memcpy(varDataVal(varTbName), name, strlen(name));
varDataSetLen(varTbName, strlen(name));
colDataAppendNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows);
}
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
}
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo) {
if (pOperator->status == OP_EXEC_DONE) {
@ -1665,7 +1674,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
// todo log the filter info
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
@ -1700,13 +1709,13 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->sysInfo = pScanPhyNode->sysInfo;
pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->pCondition = pScanNode->node.pConditions;
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initLimitInfo(pScanPhyNode->scan.node.pLimit, pScanPhyNode->scan.node.pSlimit, &pInfo->limitInfo);
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
@ -1801,15 +1810,6 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo) {
if (pFilterInfo == NULL) {
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
}
doFilter(pDataBlock, pFilterInfo, NULL);
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
}
static int32_t sysChkFilter__Comm(SNode* pNode) {
// impl
SOperatorNode* pOper = (SOperatorNode*)pNode;

View File

@ -1692,7 +1692,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = pSup;
if (!pScanInfo->pUpdateInfo) {
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
}
pScanInfo->interval = *pInterval;
@ -2869,7 +2869,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
}
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
if (!pScanInfo->pUpdateInfo) {
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
}
pScanInfo->twAggSup = *pTwSup;

View File

@ -387,6 +387,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark);
COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(igCheckUpdate);
CLONE_NODE_LIST_FIELD(pGroupTags);
COPY_SCALAR_FIELD(groupSort);
CLONE_NODE_LIST_FIELD(pTags);
@ -476,6 +477,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark);
COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(igCheckUpdate);
COPY_SCALAR_FIELD(windowAlgo);
COPY_SCALAR_FIELD(inputTsOrder);
COPY_SCALAR_FIELD(outputTsOrder);

View File

@ -1644,6 +1644,7 @@ static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
static const char* jkTableScanPhysiPlanTags = "Tags";
static const char* jkTableScanPhysiPlanSubtable = "Subtable";
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
@ -1709,6 +1710,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate);
}
return code;
}
@ -1777,6 +1781,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate);
}
return code;
}

View File

@ -2078,6 +2078,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->assignBlockUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->igCheckUpdate);
}
return code;
}
@ -2154,6 +2157,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->assignBlockUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI8(pDecoder, &pNode->igCheckUpdate);
}
return code;
}

View File

@ -568,6 +568,7 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C).
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; }
subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
@ -1106,4 +1107,4 @@ null_ordering_opt(A) ::= NULLS LAST.
%fallback ABORT AFTER ATTACH BEFORE BEGIN BITAND BITNOT BITOR BLOCKS CHANGE COMMA CONCAT CONFLICT COPY DEFERRED DELIMITERS DETACH DIVIDE DOT EACH END FAIL
FILE FOR GLOB ID IMMEDIATE IMPORT INITIALLY INSTEAD ISNULL KEY MODULES NK_BITNOT NK_SEMI NOTNULL OF PLUS PRIVILEGE RAISE REPLACE RESTRICT ROW SEMI STAR STATEMENT
STRICT STRING TIMES UPDATE VALUES VARIABLE VIEW WAL.
STRICT STRING TIMES VALUES VARIABLE VIEW WAL.

View File

@ -1813,6 +1813,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
pOptions->triggerType = STREAM_TRIGGER_AT_ONCE;
pOptions->fillHistory = STREAM_DEFAULT_FILL_HISTORY;
pOptions->ignoreExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
pOptions->ignoreUpdate = STREAM_DEFAULT_IGNORE_UPDATE;
return (SNode*)pOptions;
}

View File

@ -236,6 +236,7 @@ static SKeyword keywordTable[] = {
{"TTL", TK_TTL},
{"UNION", TK_UNION},
{"UNSIGNED", TK_UNSIGNED},
{"UPDATE", TK_UPDATE},
{"USE", TK_USE},
{"USER", TK_USER},
{"USERS", TK_USERS},

View File

@ -1576,7 +1576,8 @@ static int32_t translateBlockDistFunc(STranslateContext* pCtx, SFunctionNode* pF
TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) {
return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s is only supported on super table, child table or normal table", pFunc->functionName);
"%s is only supported on super table, child table or normal table",
pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
@ -1604,6 +1605,7 @@ static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFu
}
if (tsKeepColumnName && 1 == LIST_LENGTH(pFunc->pParameterList) && !pFunc->node.asAlias) {
strcpy(pFunc->node.userAlias, ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->userAlias);
strcpy(pFunc->node.aliasName, pFunc->node.userAlias);
}
return TSDB_CODE_SUCCESS;
}
@ -2550,11 +2552,12 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
int32_t len = 0;
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
SColumnNode* pCol = (SColumnNode*)pExpr;
len = snprintf(buf, sizeof(buf), "%s(%s.%s)", pSrcFunc->functionName, pCol->tableAlias, pCol->colName);
strncpy(pFunc->node.aliasName, buf, TMIN(len, sizeof(pFunc->node.aliasName) - 1));
if (tsKeepColumnName) {
strcpy(pFunc->node.userAlias, pCol->colName);
strcpy(pFunc->node.aliasName, pCol->colName);
} else {
len = snprintf(buf, sizeof(buf), "%s(%s.%s)", pSrcFunc->functionName, pCol->tableAlias, pCol->colName);
strncpy(pFunc->node.aliasName, buf, TMIN(len, sizeof(pFunc->node.aliasName) - 1));
len = snprintf(buf, sizeof(buf), "%s(%s)", pSrcFunc->functionName, pCol->colName);
strncpy(pFunc->node.userAlias, buf, TMIN(len, sizeof(pFunc->node.userAlias) - 1));
}
@ -6183,6 +6186,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
}
pReq->igUpdate = pStmt->pOptions->ignoreUpdate;
}
return code;

File diff suppressed because it is too large Load Diff

View File

@ -775,13 +775,13 @@ TEST_F(ParserInitialCTest, createStream) {
};
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
int8_t createStb = STREAM_CREATE_STABLE_TRUE, int8_t igExists = 0) {
int8_t igExists = 0) {
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
expect.igExists = igExists;
expect.sql = strdup(pSql);
expect.createStb = createStb;
expect.createStb = STREAM_CREATE_STABLE_TRUE;
expect.triggerType = STREAM_TRIGGER_AT_ONCE;
expect.maxDelay = 0;
expect.watermark = 0;
@ -789,15 +789,18 @@ TEST_F(ParserInitialCTest, createStream) {
expect.igExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
};
auto setStreamOptions = [&](int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0, int64_t watermark = 0,
int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) {
expect.triggerType = triggerType;
expect.maxDelay = maxDelay;
expect.watermark = watermark;
expect.fillHistory = fillHistory;
expect.igExpired = igExpired;
};
auto setStreamOptions =
[&](int8_t createStb = STREAM_CREATE_STABLE_TRUE, int8_t triggerType = STREAM_TRIGGER_AT_ONCE,
int64_t maxDelay = 0, int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY, int8_t igUpdate = STREAM_DEFAULT_IGNORE_UPDATE) {
expect.createStb = createStb;
expect.triggerType = triggerType;
expect.maxDelay = maxDelay;
expect.watermark = watermark;
expect.fillHistory = fillHistory;
expect.igExpired = igExpired;
expect.igUpdate = igUpdate;
};
auto addTag = [&](const char* pFieldName, uint8_t type, int32_t bytes = 0) {
SField field = {0};
@ -844,6 +847,7 @@ TEST_F(ParserInitialCTest, createStream) {
}
ASSERT_EQ(req.checkpointFreq, expect.checkpointFreq);
ASSERT_EQ(req.createStb, expect.createStb);
ASSERT_EQ(req.igUpdate, expect.igUpdate);
tFreeSCMCreateStreamReq(&req);
});
@ -853,13 +857,13 @@ TEST_F(ParserInitialCTest, createStream) {
setCreateStreamReq(
"s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st3 "
"as select count(*) from t1 interval(10s)",
"st3", 1, 1);
setStreamOptions(STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st3 AS "
"SELECT COUNT(*) "
"FROM t1 INTERVAL(10S)");
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 ignore "
"update 1 into st3 as select count(*) from t1 interval(10s)",
"st3", 1);
setStreamOptions(STREAM_CREATE_STABLE_TRUE, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND,
10 * MILLISECOND_PER_SECOND, 0, 1, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 IGNORE "
"UPDATE 1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
setCreateStreamReq("s1", "test",
@ -876,7 +880,8 @@ TEST_F(ParserInitialCTest, createStream) {
setCreateStreamReq(
"s1", "test",
"create stream s1 into st1 tags(tag2) as select max(c1), c2 from t1 partition by tbname tag2 interval(10s)",
"st1", STREAM_CREATE_STABLE_FALSE);
"st1");
setStreamOptions(STREAM_CREATE_STABLE_FALSE);
run("CREATE STREAM s1 INTO st1 TAGS(tag2) AS SELECT MAX(c1), c2 FROM t1 PARTITION BY TBNAME tag2 INTERVAL(10S)");
clearCreateStreamReq();
}

View File

@ -343,6 +343,13 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan->node.groupAction = GROUP_ACTION_NONE;
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK;
if (pCxt->pPlanCxt->streamQuery) {
pScan->triggerType = pCxt->pPlanCxt->triggerType;
pScan->watermark = pCxt->pPlanCxt->watermark;
pScan->deleteMark = pCxt->pPlanCxt->deleteMark;
pScan->igExpired = pCxt->pPlanCxt->igExpired;
pScan->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate;
}
// set columns to scan
if (TSDB_CODE_SUCCESS == code) {
@ -719,6 +726,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
pWindow->watermark = pCxt->pPlanCxt->watermark;
pWindow->deleteMark = pCxt->pPlanCxt->deleteMark;
pWindow->igExpired = pCxt->pPlanCxt->igExpired;
pWindow->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate;
}
pWindow->inputTsOrder = ORDER_ASC;
pWindow->outputTsOrder = ORDER_ASC;

View File

@ -328,10 +328,6 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
pScan->sliding = ((SWindowLogicNode*)pParent)->sliding;
pScan->intervalUnit = ((SWindowLogicNode*)pParent)->intervalUnit;
pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit;
pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType;
pScan->watermark = ((SWindowLogicNode*)pParent)->watermark;
pScan->deleteMark = ((SWindowLogicNode*)pParent)->deleteMark;
pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired;
}
}

View File

@ -582,6 +582,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->triggerType = pScanLogicNode->triggerType;
pTableScan->watermark = pScanLogicNode->watermark;
pTableScan->igExpired = pScanLogicNode->igExpired;
pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);

View File

@ -249,6 +249,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
int64_t ts2 = taosGetTimestampNs();
code = walReadVer(pWalHandle, index);
walReadReset(pWalHandle);
int64_t ts3 = taosGetTimestampNs();
// code = walReadVerCached(pWalHandle, index);

View File

@ -484,77 +484,50 @@ static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int szCell) {
return 0;
}
typedef struct {
int32_t iCell;
int32_t offset;
} SCellIdx;
static int32_t tCellIdxCmprFn(const void *p1, const void *p2) {
if (((SCellIdx *)p1)->offset < ((SCellIdx *)p2)->offset) {
return -1;
} else if (((SCellIdx *)p1)->offset > ((SCellIdx *)p2)->offset) {
return 1;
} else {
return 0;
}
}
static int tdbPageDefragment(SPage *pPage) {
int nFree;
int nCells;
SCell *pCell;
SCell *pNextCell;
SCell *pTCell;
int szCell;
int idx;
int iCell;
int32_t nFree = TDB_PAGE_NFREE(pPage);
int32_t nCell = TDB_PAGE_NCELLS(pPage);
nFree = TDB_PAGE_NFREE(pPage);
nCells = TDB_PAGE_NCELLS(pPage);
if (pPage->pFreeEnd - pPage->pFreeStart >= nFree) {
tdbError("tdb/page-defragment: invalid free range, nFree: %d.", nFree);
return -1;
SCellIdx *aCellIdx = (SCellIdx *)tdbOsMalloc(sizeof(SCellIdx) * nCell);
if (aCellIdx == NULL) return -1;
for (int32_t iCell = 0; iCell < nCell; iCell++) {
aCellIdx[iCell].iCell = iCell;
aCellIdx[iCell].offset = TDB_PAGE_CELL_OFFSET_AT(pPage, iCell);
}
taosSort(aCellIdx, nCell, sizeof(SCellIdx), tCellIdxCmprFn);
// Loop to compact the page content
// Here we use an O(n^2) algorithm to do the job since
// this is a low frequency job.
pNextCell = (u8 *)pPage->pPageFtr;
pCell = NULL;
for (iCell = 0;; iCell++) {
// compact over
if (iCell == nCells) {
pPage->pFreeEnd = pNextCell;
break;
SCell *pNextCell = (u8 *)pPage->pPageFtr;
for (int32_t iCell = nCell - 1; iCell >= 0; iCell--) {
SCell *pCell = TDB_PAGE_CELL_AT(pPage, aCellIdx[iCell].iCell);
int32_t szCell = pPage->xCellSize(pPage, pCell, 0, NULL, NULL);
ASSERT(pNextCell - szCell >= pCell);
pNextCell -= szCell;
if (pNextCell > pCell) {
memmove(pNextCell, pCell, szCell);
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, aCellIdx[iCell].iCell, pNextCell - pPage->pData);
}
for (int i = 0; i < nCells; i++) {
if (TDB_PAGE_CELL_OFFSET_AT(pPage, i) < pNextCell - pPage->pData) {
pTCell = TDB_PAGE_CELL_AT(pPage, i);
if (pCell == NULL || pCell < pTCell) {
pCell = pTCell;
idx = i;
}
} else {
continue;
}
}
if (NULL == pCell) {
tdbError("tdb/page-defragment: null ptr pCell.");
return -1;
}
szCell = (*pPage->xCellSize)(pPage, pCell, 0, NULL, NULL);
if (pCell + szCell > pNextCell) {
tdbError("tdb/page-defragment: invalid cell range, pCell: %p, szCell: %d, pNextCell: %p.", pCell, szCell,
pNextCell);
return -1;
}
if (pCell + szCell < pNextCell) {
memmove(pNextCell - szCell, pCell, szCell);
}
pCell = NULL;
pNextCell = pNextCell - szCell;
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, pNextCell - pPage->pData);
}
if (pPage->pFreeEnd - pPage->pFreeStart != nFree) {
tdbError("tdb/page-defragment: invalid free range, nFree: %d.", nFree);
return -1;
}
pPage->pFreeEnd = pNextCell;
TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
TDB_PAGE_FCELL_SET(pPage, 0);
tdbOsFree(aCellIdx);
ASSERT(pPage->pFreeEnd - pPage->pFreeStart == nFree);
return 0;
}

View File

@ -6,7 +6,11 @@ target_link_libraries(tdbTest tdb gtest gtest_main)
add_executable(tdbUtilTest "tdbUtilTest.cpp")
target_link_libraries(tdbUtilTest tdb gtest gtest_main)
# tdbUtilTest
# overflow pages testing
add_executable(tdbExOVFLTest "tdbExOVFLTest.cpp")
target_link_libraries(tdbExOVFLTest tdb gtest gtest_main)
# page defragment testing
add_executable(tdbPageDefragmentTest "tdbPageDefragmentTest.cpp")
target_link_libraries(tdbPageDefragmentTest tdb gtest gtest_main)

View File

@ -0,0 +1,722 @@
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "tdb.h"
#include <shared_mutex>
#include <string>
#include <thread>
#include <vector>
#include "tlog.h"
typedef struct SPoolMem {
int64_t size;
struct SPoolMem *prev;
struct SPoolMem *next;
} SPoolMem;
static SPoolMem *openPool() {
SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool));
pPool->prev = pPool->next = pPool;
pPool->size = 0;
return pPool;
}
static void clearPool(SPoolMem *pPool) {
SPoolMem *pMem;
do {
pMem = pPool->next;
if (pMem == pPool) break;
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
} while (1);
assert(pPool->size == 0);
}
static void closePool(SPoolMem *pPool) {
clearPool(pPool);
taosMemoryFree(pPool);
}
static void *poolMalloc(void *arg, size_t size) {
void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size);
if (pMem == NULL) {
assert(0);
}
pMem->size = sizeof(*pMem) + size;
pMem->next = pPool->next;
pMem->prev = pPool;
pPool->next->prev = pMem;
pPool->next = pMem;
pPool->size += pMem->size;
ptr = (void *)(&pMem[1]);
return ptr;
}
static void poolFree(void *arg, void *ptr) {
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = &(((SPoolMem *)ptr)[-1]);
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
}
static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
int k1, k2;
std::string s1((char *)pKey1 + 3, kLen1 - 3);
std::string s2((char *)pKey2 + 3, kLen2 - 3);
k1 = stoi(s1);
k2 = stoi(s2);
if (k1 < k2) {
return -1;
} else if (k1 > k2) {
return 1;
} else {
return 0;
}
}
static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
int mlen;
int cret;
ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);
mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
cret = memcmp(pKey1, pKey2, mlen);
if (cret == 0) {
if (keyLen1 < keyLen2) {
cret = -1;
} else if (keyLen1 > keyLen2) {
cret = 1;
} else {
cret = 0;
}
}
return cret;
}
// TEST(TdbPageDefragmentTest, DISABLED_TbUpsertTest) {
// TEST(TdbPageDefragmentTest, TbUpsertTest) {
//}
// TEST(TdbPageDefragmentTest, DISABLED_TbPGetTest) {
// TEST(TdbPageDefragmentTest, TbPGetTest) {
//}
static void generateBigVal(char *val, int valLen) {
for (int i = 0; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
}
static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) {
TDB *pEnv = NULL;
int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0);
if (ret) {
pEnv = NULL;
}
return pEnv;
}
static void insertOfp(void) {
int ret = 0;
taosRemoveDir("tdb");
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
// ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN *txn = NULL;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
// generate value payload
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[32605];
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
// insert the generated big data
// char const *key = "key1";
char const *key = "key123456789";
ret = tdbTbInsert(pDb, key, strlen(key), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
}
TEST(TdbPageDefragmentTest, DISABLED_TbInsertTest) {
// TEST(TdbPageDefragmentTest, TbInsertTest) {
// ofp inserting
insertOfp();
}
TEST(TdbPageDefragmentTest, DISABLED_TbGetTest) {
// TEST(TdbPageDefragmentTest, TbGetTest) {
insertOfp();
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
// int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// generate value payload
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[32605];
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
{ // Query the data
void *pVal = NULL;
int vLen;
// char const *key = "key1";
char const *key = "key123456789";
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, valLen);
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
tdbFree(pVal);
}
}
TEST(TdbPageDefragmentTest, DISABLED_TbDeleteTest) {
// TEST(TdbPageDefragmentTest, TbDeleteTest) {
int ret = 0;
taosRemoveDir("tdb");
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN *txn;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
// generate value payload
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[((4083 - 4 - 3 - 2) + 1) * 2]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
{ // insert the generated big data
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, valLen);
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
tdbFree(pVal);
}
/* open to debug committed file
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
*/
{ // upsert the data
ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the upserted data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, strlen("value1"));
GTEST_ASSERT_EQ(memcmp("value1", pVal, vLen), 0);
tdbFree(pVal);
}
{ // delete the data
ret = tdbTbDelete(pDb, "key1", strlen("key1"), txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the deleted data
void *pVal = NULL;
int vLen = -1;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == -1);
GTEST_ASSERT_EQ(ret, -1);
GTEST_ASSERT_EQ(vLen, -1);
GTEST_ASSERT_EQ(pVal, nullptr);
tdbFree(pVal);
}
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
}
TEST(TdbPageDefragmentTest, DISABLED_simple_insert1) {
// TEST(TdbPageDefragmentTest, simple_insert1) {
int ret;
TDB *pEnv;
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1;
TXN *txn;
int const pageSize = 4096;
taosRemoveDir("tdb");
// Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
{
char key[64];
// char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key0");
sprintf(val, "value%d", iData);
// ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
// GTEST_ASSERT_EQ(ret, 0);
// generate value payload
int valLen = sizeof(val) / sizeof(val[0]);
for (int i = 6; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
// commit the transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
{ // Query the data
void *pVal = NULL;
int vLen;
for (int i = 1; i <= nData; i++) {
sprintf(key, "key%d", i);
// sprintf(val, "value%d", i);
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, sizeof(val) / sizeof(val[0]));
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
}
tdbFree(pVal);
}
{ // Iterate to query the DB data
TBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int count = 0;
ret = tdbTbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0);
tdbTbcMoveToFirst(pDBC);
for (;;) {
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
// std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
// std::cout << std::endl;
count++;
}
GTEST_ASSERT_EQ(count, nData);
tdbTbcClose(pDBC);
tdbFree(pKey);
tdbFree(pVal);
}
}
ret = tdbTbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
// TEST(TdbPageDefragmentTest, DISABLED_seq_insert) {
TEST(TdbPageDefragmentTest, seq_insert) {
int ret = 0;
TDB *pEnv = NULL;
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc;
int nData = 64 * 1024;
TXN *txn = NULL;
int const pageSize = 1 * 1024 * 1024;
taosRemoveDir("tdb");
// Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// 1, insert nData kv
{
char key[64];
// char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 0; iData < nData; ++iData) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
GTEST_ASSERT_EQ(ret, 0);
/*
// generate value payload
int valLen = sizeof(val) / sizeof(val[0]);
for (int i = 6; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
ret = tdbTbInsert(pDb, key, strlen(key), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
*/
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
// commit the transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// 2, delete nData/2 records
}
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
// TEST(TdbPageDefragmentTest, DISABLED_seq_delete) {
TEST(TdbPageDefragmentTest, seq_delete) {
int ret = 0;
TDB *pEnv = NULL;
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc;
int nData = 64 * 1024;
TXN *txn = NULL;
int const pageSize = 1 * 1024 * 1024;
// Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// 2, delete nData/2 records
{
char key[64];
// char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 1; iData <= nData; iData++) {
if (iData % 2 == 0) continue;
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
{ // delete the data
ret = tdbTbDelete(pDb, key, strlen(key), txn);
GTEST_ASSERT_EQ(ret, 0);
}
/*
// generate value payload
int valLen = sizeof(val) / sizeof(val[0]);
for (int i = 6; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
ret = tdbTbInsert(pDb, key, strlen(key), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
*/
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
// commit the transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
}
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
// TEST(TdbPageDefragmentTest, DISABLED_defragment_insert) {
TEST(TdbPageDefragmentTest, defragment_insert) {
int ret = 0;
TDB *pEnv = NULL;
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc;
int nData = 64 * 1024;
TXN *txn = NULL;
int const pageSize = 1 * 1024 * 1024;
// Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// 3, insert 32k records
{
char key[64];
// char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = nData + 1; iData <= nData * 2; iData++) {
// if (iData % 2 == 0) continue;
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
GTEST_ASSERT_EQ(ret, 0);
/*
// generate value payload
int valLen = sizeof(val) / sizeof(val[0]);
for (int i = 6; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
ret = tdbTbInsert(pDb, key, strlen(key), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
*/
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
// commit the transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// 2, delete nData/2 records
}
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}

View File

@ -576,3 +576,12 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
return 0;
}
void walReadReset(SWalReader *pReader) {
taosThreadMutexLock(&pReader->mutex);
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
pReader->curInvalid = 1;
pReader->curFileFirstVer = -1;
taosThreadMutexUnlock(&pReader->mutex);
}

View File

@ -37,7 +37,9 @@ if(CHECK_STR2INT_ERROR)
add_definitions(-DTD_CHECK_STR_TO_INT_ERROR)
endif()
target_link_libraries(
os PUBLIC pthread
os
PUBLIC pthread
PUBLIC zlibstatic
)
if(TD_WINDOWS)
target_link_libraries(
@ -67,4 +69,4 @@ ENDIF ()
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
endif(${BUILD_TEST})

View File

@ -15,6 +15,7 @@
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "osSemaphore.h"
#include "zlib.h"
#ifdef WINDOWS
#include <io.h>
@ -870,3 +871,48 @@ bool taosCheckAccessFile(const char *pathname, int32_t tdFileAccessOptions) {
}
bool taosCheckExistFile(const char *pathname) { return taosCheckAccessFile(pathname, TD_FILE_ACCESS_EXIST_OK); };
int32_t taosCompressFile(char *srcFileName, char *destFileName) {
int32_t compressSize = 163840;
int32_t ret = 0;
int32_t len = 0;
char *data = taosMemoryMalloc(compressSize);
gzFile dstFp = NULL;
TdFilePtr pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
if (pSrcFile == NULL) {
ret = -1;
goto cmp_end;
}
TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
ret = -2;
goto cmp_end;
}
dstFp = gzdopen(pFile->fd, "wb6f");
if (dstFp == NULL) {
ret = -3;
taosCloseFile(&pFile);
goto cmp_end;
}
while (!feof(pSrcFile->fp)) {
len = (int32_t)fread(data, 1, compressSize, pSrcFile->fp);
(void)gzwrite(dstFp, data, len);
}
cmp_end:
if (pSrcFile) {
taosCloseFile(&pSrcFile);
}
if (dstFp) {
gzclose(dstFp);
}
taosMemoryFree(data);
return ret;
}

View File

@ -88,6 +88,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_ENCODE_ERROR, "Msg encode error")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")

View File

@ -115,7 +115,6 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t m
static SLogBuff *taosLogBuffNew(int32_t bufSize);
static void taosCloseLogByFd(TdFilePtr pFile);
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum);
static int32_t taosCompressFile(char *srcFileName, char *destFileName);
static FORCE_INLINE void taosUpdateDaylight() {
struct tm Tm, *ptm;
@ -748,50 +747,6 @@ static void *taosAsyncOutputLog(void *param) {
return NULL;
}
int32_t taosCompressFile(char *srcFileName, char *destFileName) {
int32_t compressSize = 163840;
int32_t ret = 0;
int32_t len = 0;
char *data = taosMemoryMalloc(compressSize);
// gzFile dstFp = NULL;
// srcFp = fopen(srcFileName, "r");
TdFilePtr pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ);
if (pSrcFile == NULL) {
ret = -1;
goto cmp_end;
}
TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
ret = -2;
goto cmp_end;
}
// dstFp = gzdopen(fd, "wb6f");
// if (dstFp == NULL) {
// ret = -3;
// close(fd);
// goto cmp_end;
// }
//
// while (!feof(srcFp)) {
// len = (int32_t)fread(data, 1, compressSize, srcFp);
// (void)gzwrite(dstFp, data, len);
// }
cmp_end:
if (pSrcFile) {
taosCloseFile(&pSrcFile);
}
// if (dstFp) {
// gzclose(dstFp);
// }
taosMemoryFree(data);
return ret;
}
bool taosAssertDebug(bool condition, const char *file, int32_t line, const char *format, ...) {
if (condition) return false;

View File

@ -410,7 +410,7 @@
,,y,script,./test.sh -f tmp/monitor.sim
#system test
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/balance_vgroups_r1.py -N 6
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShell.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShellError.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShellNetChk.py

View File

@ -15,7 +15,7 @@ $db = $dbPrefix . $i
sql drop database if exists $db -x step1
step1:
sql create database $db vgroups 1;
sql create database $db vgroups 1 cachemodel 'last_row'
sql use $db
sql create table $tb (ts timestamp, c1 int)
@ -122,4 +122,74 @@ if $data01 != 199 then
return -1
endi
sql drop table t1
$rowNum = 8200
$ts0 = 1537146000000
sql create table t1 (ts timestamp, c1 int)
$i = 0
$ts = $ts0
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
sql insert into t1 values ( $ts , $x )
$x = $x + 1
endw
sql delete from t1 where ts<=1537146409500
sql flush database $db
print =====================================>TD-22007
sql select count(*) from t1 interval(10a)
sql drop table t1
sql create table st1 (ts timestamp, k int) tags(a int);
sql insert into t1 using st1 tags(1) values('2020-1-1 10:10:10', 0);
sql insert into t2 using st1 tags(1) values('2020-1-1 10:10:11', 1);
sql insert into t3 using st1 tags(1) values('2020-1-1 10:10:12', 2);
sql insert into t4 using st1 tags(1) values('2020-1-1 10:10:13', 3);
sql insert into t5 using st1 tags(1) values('2020-1-1 10:10:14', 4);
sql insert into t6 using st1 tags(2) values('2020-1-1 10:10:15', 5);
sql insert into t7 using st1 tags(2) values('2020-1-1 10:10:16', 6);
sql insert into t8 using st1 tags(2) values('2020-1-1 10:10:17', 7);
sql insert into t9 using st1 tags(2) values('2020-1-1 10:10:18', 8);
sql insert into t10 using st1 tags(2) values('2020-1-1 10:10:19', 9);
sql select count(*) from st1
if $data00 != 10 then
return -1
endi
sql select last_row(*) from st1 group by a
if $rows != 2 then
return -1
endi
if $data00 != @20-01-01 10:10:19.000@ then
return -1
endi
if $data01 != 9 then
return -1
endi
if $data10 != @20-01-01 10:10:14.000@ then
return -1
endi
if $data11 != 4 then
return -1
endi
print ===============================================> TS-2613
sql select * from information_schema.ins_databases limit 1 offset 1;
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -81,6 +81,15 @@ if $rows == 0 then
return -1
endi
sleep 3000
sql drop stream if exists streamd1;
sql drop stream if exists streamd2;
sql drop stream if exists streamd3;
sql drop stream if exists streamd4;
sql drop stream if exists streamd5;
sql drop stream if exists streamd6;
sql create stream streamd10 into streamd10 as select _wstart, _wend, count(*), first(ca), last(cb) as c2 from t1 interval(10s);
sql desc streamd10;
@ -100,15 +109,6 @@ if $rows == 0 then
return -1
endi
sleep 3000
sql drop stream if exists streamd1;
sql drop stream if exists streamd2;
sql drop stream if exists streamd3;
sql drop stream if exists streamd4;
sql drop stream if exists streamd5;
sql drop stream if exists streamd6;
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,285 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step 1 start
sql drop stream if exists streams0;
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int);
print create stream streams0 trigger at_once ignore update 1 into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 interval(10s);
sql create stream streams0 trigger at_once ignore update 1 into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 interval(10s);
sql insert into t1 values(1648791213000,1,1,1);
sql insert into t1 values(1648791213000,2,2,2);
$loop_count = 0
loop0:
sleep 300
sql select * from streamt order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data02 != 2 then
print =====data02=$data02
goto loop0
endi
sql insert into t1 values(1648791213000,3,3,3);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 3 then
print =====data01=$data01
goto loop1
endi
if $data02 != 3 then
print =====data02=$data02
goto loop1
endi
print step 1 end
print step 2 start
sql drop stream if exists streams1;
sql drop database if exists test1;
sql create database test1 vgroups 1;
sql use test1;
sql create table t1(ts timestamp, a int, b int , c int);
print create stream streams1 trigger at_once ignore update 1 into streamt1 as select _wstart c1, count(*) c2, max(b) c3 from t1 session(ts, 10s);
sql create stream streams1 trigger at_once ignore update 1 into streamt1 as select _wstart c1, count(*) c2, max(b) c3 from t1 session(ts, 10s);
sql insert into t1 values(1648791213000,1,1,1);
sql insert into t1 values(1648791213000,2,2,2);
$loop_count = 0
loop2:
sleep 300
sql select * from streamt1 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
if $data02 != 2 then
print =====data02=$data02
goto loop2
endi
sql insert into t1 values(1648791213000,3,3,3);
$loop_count = 0
loop3:
sleep 300
sql select * from streamt1 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 3 then
print =====data01=$data01
goto loop3
endi
if $data02 != 3 then
print =====data02=$data02
goto loop3
endi
print step 2 end
print step 3 start
sql drop stream if exists streams2;
sql drop database if exists test2;
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int);
print create stream streams2 trigger at_once ignore update 1 into streamt2 as select _wstart c1, count(*) c2, max(b) c3 from t1 state_window(c);
sql create stream streams2 trigger at_once ignore update 1 into streamt2 as select _wstart c1, count(*) c2, max(b) c3 from t1 state_window(c);
sql insert into t1 values(1648791213000,1,1,1);
sql insert into t1 values(1648791213000,2,2,1);
$loop_count = 0
loop2:
sleep 300
sql select * from streamt2 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
if $data02 != 2 then
print =====data02=$data02
goto loop2
endi
sql insert into t1 values(1648791213000,3,3,1);
$loop_count = 0
loop3:
sleep 300
sql select * from streamt2 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 3 then
print =====data01=$data01
goto loop3
endi
if $data02 != 3 then
print =====data02=$data02
goto loop3
endi
print step 3 end
print step 4 start
sql drop stream if exists streams3;
sql drop database if exists test3;
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
print create stream streams3 trigger at_once ignore update 1 into streamt3 as select _wstart c1, count(*) c2, max(b) c3 from st interval(10s);
sql create stream streams3 trigger at_once ignore update 1 into streamt3 as select _wstart c1, count(*) c2, max(b) c3 from st interval(10s);
sql insert into t1 values(1648791213000,1,1,1);
sql insert into t1 values(1648791213000,2,2,2);
sql insert into t2 values(1648791213000,1,1,1);
sql insert into t2 values(1648791213000,2,2,2);
$loop_count = 0
loop0:
sleep 300
sql select * from streamt3 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 4 then
print =====data01=$data01
goto loop0
endi
if $data02 != 2 then
print =====data02=$data02
goto loop0
endi
sql insert into t1 values(1648791213000,3,3,3);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt3 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 5 then
print =====data01=$data01
goto loop1
endi
if $data02 != 3 then
print =====data02=$data02
goto loop1
endi
sql insert into t2 values(1648791213000,4,4,4);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt3 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 6 then
print =====data01=$data01
goto loop1
endi
if $data02 != 4 then
print =====data02=$data02
goto loop1
endi
print step 4 end
system sh/stop_dnodes.sh

View File

@ -0,0 +1,99 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util import constant
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
from util.cluster import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.dnode_num=len(cluster.dnodes)
self.dbname = 'db_test'
self.setsql = TDSetSql()
self.stbname = f'{self.dbname}.stb'
self.rowNum = 5
self.tbnum = 10
self.ts = 1537146000000
self.binary_str = 'taosdata'
self.nchar_str = '涛思数据'
self.column_dict = {
'ts' : 'timestamp',
'col1': 'tinyint',
'col2': 'smallint',
'col3': 'int',
'col4': 'bigint',
'col5': 'tinyint unsigned',
'col6': 'smallint unsigned',
'col7': 'int unsigned',
'col8': 'bigint unsigned',
'col9': 'float',
'col10': 'double',
'col11': 'bool',
'col12': 'binary(20)',
'col13': 'nchar(20)'
}
self.replica = [1,3]
def insert_data(self,column_dict,tbname,row_num):
insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str)
for i in range(row_num):
insert_list = []
self.setsql.insert_values(column_dict,i,insert_sql,insert_list,self.ts)
def prepare_data(self,dbname,stbname,column_dict,tbnum,rowNum,replica):
tag_dict = {
't0':'int'
}
tag_values = [
f'1'
]
tdSql.execute(f"create database if not exists {dbname} vgroups 1 replica {replica} ")
tdSql.execute(f'use {dbname}')
tdSql.execute(self.setsql.set_create_stable_sql(stbname,column_dict,tag_dict))
for i in range(tbnum):
tdSql.execute(f"create table {stbname}_{i} using {stbname} tags({tag_values[0]})")
self.insert_data(self.column_dict,f'{stbname}_{i}',rowNum)
def redistribute_vgroups(self,replica,stbname,tbnum,rownum):
tdSql.query('show vgroups')
vnode_id = tdSql.queryResult[0][0]
if replica == 1:
for dnode_id in range(1,self.dnode_num+1) :
tdSql.execute(f'redistribute vgroup {vnode_id} dnode {dnode_id}')
tdSql.query(f'select count(*) from {stbname}')
tdSql.checkEqual(tdSql.queryResult[0][0],tbnum*rownum)
elif replica == 3:
for dnode_id in range(1,self.dnode_num-1):
tdSql.execute(f'redistribute vgroup {vnode_id} dnode {dnode_id} dnode {dnode_id+1} dnode {dnode_id+2}')
tdSql.query(f'select count(*) from {stbname}')
tdSql.checkEqual(tdSql.queryResult[0][0],tbnum*rownum)
def run(self):
for replica in self.replica:
self.prepare_data(self.dbname,self.stbname,self.column_dict,self.tbnum,self.rowNum,replica)
self.redistribute_vgroups(replica,self.stbname,self.tbnum,self.rowNum)
tdSql.execute(f'drop database {self.dbname}')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -31,6 +31,10 @@ class TDTestCase:
tdSql.execute('create database if not exists test;')
tdSql.execute('create table test.stb (ts timestamp, c11 int, c12 float ) TAGS(t11 int, t12 int );')
tdSql.execute('create table test.tb using test.stb TAGS (1, 1);')
# double comma insert check error
tdSql.error("insert into test.tb(ts, c11) values(now,,100)")
sql_list = list()
for i in range(5):
sql = f'insert into test.tb values (now-{i}m, {i}, {i});'

View File

@ -199,6 +199,11 @@ class TDTestCase:
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 1, 1)
tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}')
tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'")
uid1 = tdSql.getData(0, 5)
uid2 = tdSql.getData(1, 5)
tdSql.checkNotEqual(uid1, uid2)
return
def checkWal1Vgroup(self):