Merge branch '3.0' into TEST/3.0/TD-30188

This commit is contained in:
Chris Zhai 2024-06-03 10:09:24 +08:00 committed by GitHub
commit 7a869abcbf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 109 additions and 75 deletions

View File

@ -190,7 +190,7 @@ T3 时刻最新事件到达T 向后推移超过了第二个窗口关闭的
TDengine 对于过期数据提供两种处理方式,由 IGNORE EXPIRED 选项指定:
1. 重新计算,即 IGNORE EXPIRED 0从 TSDB 中重新查找对应窗口的所有数据并重新计算得到最新结果
1. 增量计算,即 IGNORE EXPIRED 0。
2. 直接丢弃,即 IGNORE EXPIRED 1默认配置忽略过期数据

View File

@ -177,7 +177,7 @@ static int32_t tBufferReaderInit(SBufferReader *reader, uint32_t offset, SBuffer
}
static FORCE_INLINE int32_t tBufferGet(SBufferReader *reader, uint32_t size, void *data) {
if (reader->offset < 0 || reader->offset + size > reader->buffer->size) {
if (reader->offset + size > reader->buffer->size) {
return TSDB_CODE_OUT_OF_RANGE;
}
if (data) {

View File

@ -119,7 +119,7 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
do { \
if ((CODE)) { \
if (TSDB_CODE_SUCCESS != (CODE)) { \
LINO = __LINE__; \
goto LABEL; \
} \

View File

@ -5616,18 +5616,36 @@ int32_t tSerializeSVKillCompactReq(void *buf, int32_t bufLen, SVKillCompactReq *
}
int32_t tDeserializeSVKillCompactReq(void *buf, int32_t bufLen, SVKillCompactReq *pReq) {
int32_t code = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tStartDecode(&decoder) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
goto _exit;
}
if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->compactId) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
goto _exit;
}
if (tDecodeI32(&decoder, &pReq->vgId) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
goto _exit;
}
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
goto _exit;
}
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
return 0;
return code;
}
int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeConfigReq *pReq) {

View File

@ -211,6 +211,11 @@ _exit:
return code;
}
static int32_t tsdbCommitCloseReader(SCommitter2 *committer) {
TARRAY2_CLEAR(committer->sttReaderArray, tsdbSttFileReaderClose);
return 0;
}
static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
@ -255,13 +260,17 @@ static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
_exit:
if (code) {
tsdbCommitCloseReader(committer);
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitCloseReader(SCommitter2 *committer) {
TARRAY2_CLEAR(committer->sttReaderArray, tsdbSttFileReaderClose);
static int32_t tsdbCommitCloseIter(SCommitter2 *committer) {
tsdbIterMergerClose(&committer->tombIterMerger);
tsdbIterMergerClose(&committer->dataIterMerger);
TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose);
TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose);
return 0;
}
@ -335,19 +344,12 @@ static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
_exit:
if (code) {
tsdbCommitCloseIter(committer);
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitCloseIter(SCommitter2 *committer) {
tsdbIterMergerClose(&committer->tombIterMerger);
tsdbIterMergerClose(&committer->dataIterMerger);
TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose);
TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose);
return 0;
}
static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;

View File

@ -74,7 +74,7 @@ int32_t tsdbDataFileRAWReadBlockData(SDataFileRAWReader *reader, STsdbDataRAWBlo
pBlock->file.stt->level = reader->config->file.stt->level;
int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
char* encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
char *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
code = tsdbReadFile(reader->fd, pBlock->offset, pBlock->data, pBlock->dataLength, 0, encryptAlgorithm, encryptKey);
TSDB_CHECK_CODE(code, lino, _exit);
@ -130,7 +130,7 @@ static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFil
TSDB_CHECK_CODE(code, lino, _exit);
int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
char* encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
char *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
if (writer->fd) {
code = tsdbFsyncFile(writer->fd, encryptAlgorithm, encryptKey);
@ -211,13 +211,13 @@ _exit:
return code;
}
int32_t tsdbDataFileRAWWriteBlockData(SDataFileRAWWriter *writer, const STsdbDataRAWBlockHeader *pDataBlock,
int32_t encryptAlgorithm, char* encryptKey) {
int32_t tsdbDataFileRAWWriteBlockData(SDataFileRAWWriter *writer, const STsdbDataRAWBlockHeader *pDataBlock,
int32_t encryptAlgorithm, char *encryptKey) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbWriteFile(writer->fd, writer->ctx->offset, (const uint8_t *)pDataBlock->data, pDataBlock->dataLength,
encryptAlgorithm, encryptKey);
encryptAlgorithm, encryptKey);
TSDB_CHECK_CODE(code, lino, _exit);
writer->ctx->offset += pDataBlock->dataLength;

View File

@ -44,7 +44,7 @@ static const struct {
};
void remove_file(const char *fname) {
taosRemoveFile(fname);
(void)taosRemoveFile(fname);
tsdbInfo("file:%s is removed", fname);
}

View File

@ -90,6 +90,11 @@ _exit:
return code;
}
static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
return 0;
}
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
@ -220,8 +225,10 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(merger->sttReaderArr, reader);
TSDB_CHECK_CODE(code, lino, _exit);
if ((code = TARRAY2_APPEND(merger->sttReaderArr, reader))) {
tsdbSttFileReaderClose(&reader);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
@ -232,6 +239,7 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
_exit:
if (code) {
tsdbMergeFileSetEndCloseReader(merger);
TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
}
return code;
@ -375,11 +383,6 @@ static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
return 0;
}
static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
return 0;
}
static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;

View File

@ -321,10 +321,10 @@ int32_t tDeserializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartLis
if (tDecodeI64(&decoder, &r.minVer) < 0) goto _err;
if (tDecodeI64(&decoder, &r.maxVer) < 0) goto _err;
if (tDecodeI64(&decoder, &reserved64) < 0) goto _err;
TARRAY2_APPEND(iList, r);
if (TARRAY2_APPEND(iList, r)) goto _err;
}
}
TARRAY2_APPEND(pList, p);
if (TARRAY2_APPEND(pList, p)) goto _err;
p = NULL;
}

View File

@ -57,6 +57,20 @@ struct STsdbSnapReader {
STombBlock tombBlock[1];
};
static int32_t tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
int32_t code = 0;
int32_t lino = 0;
TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
tsdbDataFileReaderClose(&reader->dataReader);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
}
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
int32_t code = 0;
int32_t lino = 0;
@ -100,27 +114,16 @@ static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(reader->sttReaderArr, sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
if ((code = TARRAY2_APPEND(reader->sttReaderArr, sttReader))) {
tsdbSttFileReaderClose(&sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
}
static int32_t tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
int32_t code = 0;
int32_t lino = 0;
TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
tsdbDataFileReaderClose(&reader->dataReader);
_exit:
if (code) {
tsdbSnapReadFileSetCloseReader(reader);
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;

View File

@ -141,6 +141,7 @@ static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) {
_exit:
if (code) {
tsdbSnapRAWReadFileSetCloseReader(reader);
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
@ -546,7 +547,7 @@ static int32_t tsdbSnapRAWWriteTimeSeriesData(STsdbSnapRAWWriter* writer, STsdbD
int32_t lino = 0;
int32_t encryptAlgorithm = writer->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
char* encryptKey = writer->tsdb->pVnode->config.tsdbCfg.encryptKey;
char* encryptKey = writer->tsdb->pVnode->config.tsdbCfg.encryptKey;
code = tsdbFSetRAWWriteBlockData(writer->ctx->fsetWriter, bHdr, encryptAlgorithm, encryptKey);
TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -896,7 +896,7 @@ static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(writer->config->tsdb, writer->file, fname);
tsdbCloseFile(&writer->fd);
taosRemoveFile(fname);
(void)taosRemoveFile(fname);
return 0;
}

View File

@ -56,7 +56,8 @@ int32_t tTombBlockGet(STombBlock *tombBlock, int32_t idx, STombRecord *record) {
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * idx, &tombBlock->buffers[i]);
tBufferGetI64(&br, &record->data[i]);
int32_t code = tBufferGetI64(&br, &record->data[i]);
if (code) return code;
}
return 0;
}
@ -182,7 +183,8 @@ int32_t tStatisBlockPut(STbStatisBlock *block, SRowInfo *row, int32_t maxRecords
if (block->numOfRecords > 0) {
int64_t lastUid;
SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * (block->numOfRecords - 1), &block->uids);
tBufferGetI64(&br, &lastUid);
int32_t code = tBufferGetI64(&br, &lastUid);
if (code) return code;
if (lastUid == row->uid) {
return tStatisBlockUpdate(block, row);

View File

@ -200,7 +200,7 @@ static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
// process data read from server, add decompress etc later
static void cliHandleResp(SCliConn* conn);
// handle except about conn
static void cliHandleExcept(SCliConn* conn);
static void cliHandleExcept(SCliConn* conn, int32_t code);
static void cliReleaseUnfinishedMsg(SCliConn* conn);
static void cliHandleFastFail(SCliConn* pConn, int status);
@ -571,8 +571,11 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn);
transUnrefCliHandle(pConn);
}
void cliHandleExcept(SCliConn* conn) {
void cliHandleExcept(SCliConn* conn, int32_t code) {
tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
if (code != TSDB_CODE_RPC_FQDN_ERROR) {
code = -1;
}
cliHandleExceptImpl(conn, -1);
}
@ -866,7 +869,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
while (transReadComplete(pBuf)) {
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
if (pBuf->invalid) {
cliHandleExcept(conn);
cliHandleExcept(conn, -1);
break;
} else {
cliHandleResp(conn);
@ -886,7 +889,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
T_REF_VAL_GET(conn));
conn->broken = true;
cliHandleExcept(conn);
cliHandleExcept(conn, -1);
}
}
@ -1037,7 +1040,7 @@ static void cliSendCb(uv_write_t* req, int status) {
} else {
if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
cliHandleExcept(pConn);
cliHandleExcept(pConn, -1);
}
return;
}
@ -1113,7 +1116,7 @@ void cliSend(SCliConn* pConn) {
if (transQueueEmpty(&pConn->cliMsgs)) {
tError("%s conn %p not msg to send", pTransInst->label, pConn);
cliHandleExcept(pConn);
cliHandleExcept(pConn, -1);
return;
}
@ -1186,7 +1189,7 @@ void cliSend(SCliConn* pConn) {
if (status != 0) {
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
uv_err_name(status));
cliHandleExcept(pConn);
cliHandleExcept(pConn, -1);
}
return;
_RETURN:
@ -1242,7 +1245,8 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleFastFail(conn, -1);
cliHandleFastFail(conn, terrno);
terrno = 0;
return;
}
struct sockaddr_in addr;
@ -1304,7 +1308,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
p->wLen, p->batchSize, uv_err_name(status));
if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);
if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn, -1);
cliHandleBatchReq(nxtBatch, thrd);
} else {
@ -1362,7 +1366,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
cliDestroyBatch(pConn->pBatch);
pConn->pBatch = NULL;
}
cliHandleExcept(pConn);
cliHandleExcept(pConn, status);
}
void cliConnCb(uv_connect_t* req, int status) {
@ -1653,7 +1657,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleExcept(conn);
cliHandleExcept(conn, terrno);
terrno = 0;
return;
}
@ -1667,20 +1672,20 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
if (fd == -1) {
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
cliHandleExcept(conn);
cliHandleExcept(conn, -1);
errno = 0;
return;
}
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
if (ret != 0) {
tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleExcept(conn);
cliHandleExcept(conn, -1);
return;
}
ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle);
if (ret != 0) {
tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleExcept(conn);
cliHandleExcept(conn, -1);
return;
}

View File

@ -27,8 +27,8 @@
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
,,n,army,python3 ./test.py -f community/query/show.py -N 3
,,n,army,python3 ./test.py -f enterprise/alter/alterConfig.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/subquery/subqueryBugs.py -N 3 -L 3 -D 1
,,y,army,./pytest.sh python3 ./test.py -f community/storage/compressBasic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/subquery/subqueryBugs.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/storage/oneStageComp.py -N 3 -L 3 -D 1
,,y,army,./pytest.sh python3 ./test.py -f community/storage/compressBasic.py -N 3
#

View File

@ -436,13 +436,13 @@ class TDTestCase:
tdSql.execute(f"use db")
tdSql.execute(f"create table db.st(ts timestamp, ibv bigint, ubv bigint unsigned) tags(area int)")
# insert t1 data
tdSql.execute(f"insert into db.t1 using db.st tags(1) values(now,9223372036854775801,18446744073709551611)")
tdSql.execute(f"insert into db.t1 using db.st tags(1) values(now,8223372036854775801,17446744073709551611)")
tdSql.execute(f"insert into db.t1 using db.st tags(1) values(now,7223372036854775801,16446744073709551611)")
tdSql.execute(f"insert into db.t1 using db.st tags(1) values(now + 1s,9223372036854775801,18446744073709551611)")
tdSql.execute(f"insert into db.t1 using db.st tags(1) values(now + 2s,8223372036854775801,17446744073709551611)")
tdSql.execute(f"insert into db.t1 using db.st tags(1) values(now + 3s,7223372036854775801,16446744073709551611)")
# insert t2 data
tdSql.execute(f"insert into db.t2 using db.st tags(2) values(now,9223372036854775801,18446744073709551611)")
tdSql.execute(f"insert into db.t2 using db.st tags(2) values(now,8223372036854775801,17446744073709551611)")
tdSql.execute(f"insert into db.t2 using db.st tags(2) values(now,7223372036854775801,16446744073709551611)")
tdSql.execute(f"insert into db.t2 using db.st tags(2) values(now + 1s,9223372036854775801,18446744073709551611)")
tdSql.execute(f"insert into db.t2 using db.st tags(2) values(now + 2s,8223372036854775801,17446744073709551611)")
tdSql.execute(f"insert into db.t2 using db.st tags(2) values(now + 3s,7223372036854775801,16446744073709551611)")
# check single table answer
tdSql.query(f"select avg(ibv), avg(ubv) from db.t1")