From b64bfa979fcdac35095cedc486b76cbc6d477126 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 6 Mar 2022 20:05:41 +0800 Subject: [PATCH 1/6] Feature/td 11463 3.0 (#10573) * Block-wise SMA extraction * refactor the SBlock * add method tsdbLoadBlockOffset * set method tsdbLoadBlockOffset static * refactor * trigger CI * minor change * trigger CI * add STSma defintion * add STSma schema encode/decode * restore * code optimization * put/get sma schema from bdb * add/check tSma schema methods * code optimization * code optimization --- include/common/tmsg.h | 68 ++- include/common/tmsgdef.h | 3 + include/common/trow.h | 6 +- source/dnode/vnode/inc/meta.h | 1 + source/dnode/vnode/inc/tsdb.h | 21 + source/dnode/vnode/src/inc/tsdbDef.h | 1 + source/dnode/vnode/src/inc/tsdbFS.h | 1 + source/dnode/vnode/src/inc/tsdbSma.h | 95 ++++ source/dnode/vnode/src/meta/metaBDBImpl.c | 60 ++- source/dnode/vnode/src/tsdb/tsdbSma.c | 550 ++++++++++++++++++++++ source/dnode/vnode/src/tsdb/tsdbWrite.c | 41 ++ source/dnode/vnode/src/vnd/vnodeWrite.c | 9 + source/dnode/vnode/test/tsdbSmaTest.cpp | 19 +- 13 files changed, 852 insertions(+), 23 deletions(-) create mode 100644 source/dnode/vnode/src/inc/tsdbSma.h create mode 100644 source/dnode/vnode/src/tsdb/tsdbSma.c diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d1e02af287..65860d4959 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1866,10 +1866,59 @@ typedef struct { uint64_t tableUid; // super/common table uid int64_t interval; int64_t sliding; - col_id_t* colIds; // N.B. sorted column ids - uint16_t* funcIds; // N.B. sorted sma function ids + col_id_t* colIds; // sorted column ids + uint16_t* funcIds; // sorted sma function ids } STSma; // Time-range-wise SMA +typedef struct { + int8_t msgType; // 0 create, 1 recreate + STSma tSma; + STimeWindow window; +} SCreateTSmaMsg; + +typedef struct { + STimeWindow window; + char indexName[TSDB_INDEX_NAME_LEN + 1]; +} SDropTSmaMsg; + +typedef struct { + STimeWindow tsWindow; // [skey, ekey] + uint64_t tableUid; // sub/common table uid + int32_t numOfBlocks; // number of sma blocks for each column, total number is numOfBlocks*numOfColId + int32_t dataLen; // total data length + col_id_t* colIds; // e.g. 2,4,9,10 + col_id_t numOfColIds; // e.g. 4 + char data[]; // the sma blocks +} STSmaData; + +// TODO: move to the final location afte schema of STSma/STSmaData defined +static FORCE_INLINE void tdDestroySmaData(STSmaData* pSmaData) { + if (pSmaData) { + if (pSmaData->colIds) { + tfree(pSmaData->colIds); + } + tfree(pSmaData); + } +} + +// RSma: Time-range-wise Rollup SMA +// TODO: refactor when rSma grammar defined finally => +typedef struct { + int64_t interval; + int32_t retention; // unit: day + uint16_t days; // unit: day + int8_t intervalUnit; +} SSmaParams; +// TODO: refactor when rSma grammar defined finally <= + +typedef struct { + // TODO: refactor to use the real schema => + STSma tsma; + float xFilesFactor; + SArray* smaParams; // SSmaParams + // TODO: refactor to use the real schema <= +} SRSma; + typedef struct { uint32_t number; STSma* tSma; @@ -1885,12 +1934,17 @@ static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) { } } -static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) { - if (pSW && pSW->tSma) { - for (uint32_t i = 0; i < pSW->number; ++i) { - tdDestroyTSma(pSW->tSma + i, false); +static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW, bool releaseSelf) { + if (pSW) { + if (pSW->tSma) { + for (uint32_t i = 0; i < pSW->number; ++i) { + tdDestroyTSma(pSW->tSma + i, false); + } + tfree(pSW->tSma); + } + if (releaseSelf) { + free(pSW); } - tfree(pSW->tSma); } } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index cae186ba16..0f0c4729bc 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -184,6 +184,9 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) // Requests handled by QNODE TD_NEW_MSG_SEG(TDMT_QND_MSG) diff --git a/include/common/trow.h b/include/common/trow.h index 49bc2f3515..ef30796d78 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -118,6 +118,8 @@ typedef struct { } SKvRow; typedef struct { + /// timestamp + TSKEY ts; union { /// union field for encode and decode uint32_t info; @@ -138,8 +140,6 @@ typedef struct { uint32_t len; /// row version uint64_t ver; - /// timestamp - TSKEY ts; /// the inline data, maybe a tuple or a k-v tuple char data[]; } STSRow; @@ -173,7 +173,7 @@ typedef struct { #define TD_ROW_DATA(r) ((r)->data) #define TD_ROW_LEN(r) ((r)->len) #define TD_ROW_KEY(r) ((r)->ts) -#define TD_ROW_KEY_ADDR(r) POINTER_SHIFT((r), 16) +#define TD_ROW_KEY_ADDR(r) (r) // N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and // (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined. diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index e5ad43a4ee..b20be691ef 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -57,6 +57,7 @@ STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); SSmaCfg * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName); +STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 6bc89ddd66..5513742c73 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -87,6 +87,27 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb); +/** + * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData); + +/** + * @brief Insert RSma(Time-range-wise Rollup SMA) data. + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData); + + // STsdbCfg int tsdbOptionsInit(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *); diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 98c0de32a8..96a76ea7d4 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -35,6 +35,7 @@ #include "tsdbMemory.h" #include "tsdbOptions.h" #include "tsdbReadImpl.h" +#include "tsdbSma.h" #ifdef __cplusplus extern "C" { diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index 71f35a9eca..641255a294 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -42,6 +42,7 @@ typedef struct { typedef struct { STsdbFSMeta meta; // FS meta SArray * df; // data file array + SArray * smaf; // sma data file array } SFSStatus; typedef struct { diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h new file mode 100644 index 0000000000..2a326eece8 --- /dev/null +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_SMA_H_ +#define _TD_TSDB_SMA_H_ + +// insert/update interface +int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData); +int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData); + + +// query interface +// TODO: This is the basic params, and should wrap the params to a queryHandle. +int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult); + +// management interface +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void* result); +int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); + + + + +// internal func +static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { + int32_t len = 0; + len += taosEncodeFixedU64(pData, tableUid); + len += taosEncodeFixedU16(pData, colId); + len += taosEncodeFixedI64(pData, tsKey); + return len; +} + +#if 0 + +typedef struct { + int minFid; + int midFid; + int maxFid; + TSKEY minKey; +} SRtn; + +typedef struct { + uint64_t uid; + int64_t offset; + int64_t size; +} SKVRecord; + +void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); + +static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { + if (key < 0) { + return (int)((key + 1) / tsTickPerDay[precision] / days - 1); + } else { + return (int)((key / tsTickPerDay[precision] / days)); + } +} + +static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { + if (fid >= pRtn->maxFid) { + return 0; + } else if (fid >= pRtn->midFid) { + return 1; + } else if (fid >= pRtn->minFid) { + return 2; + } else { + return -1; + } +} + +#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) + +int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); +void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); +void *tsdbCommitData(STsdbRepo *pRepo); +int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); +int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx); +int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); +int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, + bool isLast, bool isSuper, void **ppBuf, void **ppCBuf); +int tsdbApplyRtn(STsdbRepo *pRepo); + +#endif + +#endif /* _TD_TSDB_SMA_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index c31f28d983..03a2937679 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -833,6 +833,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) { } pCur->uid = uid; + // TODO: lock? ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &(pCur->pCur), 0); if (ret != 0) { free(pCur); @@ -852,25 +853,68 @@ void metaCloseSmaCurosr(SMSmaCursor *pCur) { } } -const char* metaSmaCursorNext(SMSmaCursor *pCur) { - DBT skey = {0}; - DBT pkey = {0}; - DBT pval = {0}; - void *pBuf; +const char *metaSmaCursorNext(SMSmaCursor *pCur) { + DBT skey = {0}; + DBT pkey = {0}; + DBT pval = {0}; // Set key skey.data = &(pCur->uid); skey.size = sizeof(pCur->uid); - + // TODO: lock? if (pCur->pCur->pget(pCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) { - const char* indexName = (const char *)pkey.data; + const char *indexName = (const char *)pkey.data; assert(indexName != NULL); return indexName; } else { - return 0; + return NULL; } } +STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) { + STSmaWrapper *pSW = NULL; + + pSW = calloc(sizeof(*pSW), 1); + if (pSW == NULL) { + return NULL; + } + + SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); + if (pCur == NULL) { + free(pSW); + return NULL; + } + + DBT skey = {.data = &(pCur->uid)}; + DBT pval = {.size = sizeof(pCur->uid)}; + void *pBuf = NULL; + + while (true) { + // TODO: lock? + if (pCur->pCur->pget(pCur->pCur, &skey, NULL, &pval, DB_NEXT) == 0) { + ++pSW->number; + STSma *tptr = (STSma *)realloc(pSW->tSma, pSW->number * sizeof(STSma)); + if (tptr == NULL) { + metaCloseSmaCurosr(pCur); + tdDestroyTSmaWrapper(pSW, true); + return NULL; + } + pSW->tSma = tptr; + pBuf = pval.data; + if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) { + metaCloseSmaCurosr(pCur); + tdDestroyTSmaWrapper(pSW, true); + return NULL; + } + continue; + } + break; + } + + metaCloseSmaCurosr(pCur); + + return pSW; +} static void metaDBWLock(SMetaDB *pDB) { #if IMPL_WITH_LOCK diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c new file mode 100644 index 0000000000..b465dc3a88 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -0,0 +1,550 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbDef.h" + +#define SMA_STORAGE_TSDB_DAYS 30 +#define SMA_STORAGE_SPLIT_HOURS 24 +#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 + +#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks + +typedef enum { + SMA_STORAGE_LEVEL_TSDB = 0, // store TSma in dir e.g. vnode${N}/tsdb/.tsma + SMA_STORAGE_LEVEL_DFILESET = 1 // store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name} +} ESmaStorageLevel; + +typedef struct { + STsdb * pTsdb; + char * pDFile; // TODO: use the real DFile type, not char* + int32_t interval; // interval with the precision of DB + int32_t blockSize; // size of SMA block item + // TODO +} STSmaWriteH; + +typedef struct { + int32_t iter; +} SmaFsIter; +typedef struct { + STsdb * pTsdb; + char * pDFile; // TODO: use the real DFile type, not char* + int32_t interval; // interval with the precision of DB + int32_t blockSize; // size of SMA block item + int8_t storageLevel; + int8_t days; + SmaFsIter smaFsIter; + // TODO +} STSmaReadH; + +// declaration of static functions +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit); +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t sectionDataLen, int32_t nBlocks); +static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); +static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset, + int32_t fid, int32_t *nSmaBlocks); +static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, + int32_t fid); + +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); + +/** + * @brief Judge the tSma storage level + * + * @param interval + * @param intervalUnit + * @return int32_t + */ +static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) { + // TODO: configurable for SMA_STORAGE_SPLIT_HOURS? + switch (intervalUnit) { + case TD_TIME_UNIT_HOUR: + if (interval < SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_MINUTE: + if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_SEC: + if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_MILLISEC: + if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_MICROSEC: + if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_NANOSEC: + if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + default: + break; + } + return SMA_STORAGE_LEVEL_TSDB; +} + +/** + * @brief Insert TSma data blocks to B+Tree + * + * @param bTree + * @param smaKey + * @param pData + * @param dataLen + * @return int32_t + */ +static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) { + // TODO: insert sma data blocks into B+Tree + printf("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d\n", + *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); + return TSDB_CODE_SUCCESS; +} + +static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision) { + if (intervalUnit < TD_TIME_UNIT_MILLISEC) { + switch (intervalUnit) { + case TD_TIME_UNIT_YEAR: + case TD_TIME_UNIT_SEASON: + case TD_TIME_UNIT_MONTH: + case TD_TIME_UNIT_WEEK: + // illegal time unit + tsdbError("invalid interval unit: %d\n", intervalUnit); + TASSERT(0); + break; + case TD_TIME_UNIT_DAY: // the interval for tSma calculation must <= day + interval *= 86400 * 1e3; + break; + case TD_TIME_UNIT_HOUR: + interval *= 3600 * 1e3; + break; + case TD_TIME_UNIT_MINUTE: + interval *= 60 * 1e3; + break; + case TD_TIME_UNIT_SEC: + interval *= 1e3; + break; + default: + break; + } + } + + switch (intervalUnit) { + case TD_TIME_UNIT_MILLISEC: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval * 1e3; + } else { // nano second + return interval * 1e6; + } + break; + case TD_TIME_UNIT_MICROSEC: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval / 1e3; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval; + } else { // nano second + return interval * 1e3; + } + break; + case TD_TIME_UNIT_NANOSEC: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval / 1e6; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval / 1e3; + } else { // nano second + return interval; + } + break; + default: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval * 1e3; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval * 1e6; + } else { // nano second + return interval * 1e9; + } + break; + } + return interval; +} + +/** + * @brief Split the TSma data blocks into expected size and insert into B+Tree. + * + * @param pSmaH + * @param pData + * @param nOffset The nOffset of blocks since fid changes. + * @param nBlocks The nBlocks with the same fid since nOffset. + * @return int32_t + */ +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t nOffset, int32_t nBlocks) { + STsdb *pTsdb = pSmaH->pTsdb; + + TASSERT(pData->colIds != NULL); + + tsdbDebug("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d", nOffset, nBlocks); + printf("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d\n", nOffset, nBlocks); + + int32_t colDataLen = pData->dataLen / pData->numOfColIds; + int32_t sectionDataLen = pSmaH->blockSize * nBlocks; + + for (col_id_t i = 0; i < pData->numOfColIds; ++i) { + // param: pointer of B+Tree, key, value, dataLen + void *bTree = pSmaH->pDFile; +#ifndef SMA_STORE_SINGLE_BLOCKS + // save tSma data blocks as a whole + char smaKey[SMA_KEY_LEN] = {0}; + void *pSmaKey = &smaKey; + tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + nOffset * pSmaH->interval, + (void **)&pSmaKey); + if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + nOffset * pSmaH->blockSize, sectionDataLen) < + 0) { + tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } +#else + // save tSma data blocks separately + for (int32_t n = 0; n < nBlocks; ++n) { + char smaKey[SMA_KEY_LEN] = {0}; + void *pSmaKey = &smaKey; + tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + (nOffset + n) * pSmaH->interval, + (void **)&pSmaKey); + if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + (nOffset + n) * pSmaH->blockSize, + pSmaH->blockSize) < 0) { + tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + } +#endif + } + return TSDB_CODE_SUCCESS; +} + +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { + pSmaH->pTsdb = pTsdb; + pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); +} + +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, + int32_t fid) { + // TODO + pSmaH->pDFile = "tSma_interval_file_name"; + + return TSDB_CODE_SUCCESS; +} /** + * @brief Split the sma data blocks by fid. + * + * @param pSmaH + * @param param + * @param pData + * @param nOffset + * @param fid + * @param nSmaBlocks + * @return int32_t + */ +static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset, + int32_t fid, int32_t *nSmaBlocks) { + STsdbCfg *pCfg = REPO_CFG(pSmaH->pTsdb); + + // TODO: use binary search + for (int32_t n = nOffset + 1; n < pData->numOfBlocks; ++n) { + // TODO: The tsWindow.skey should use the precision of DB. + int32_t tFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey + pSmaH->interval * n, days, pCfg->precision)); + if (tFid > fid) { + *nSmaBlocks = n - nOffset; + break; + } + } + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Insert/Update Time-range-wise SMA data. + * - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g. + * v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files. + * - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The + * days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d). + * - The destination file of one data block for some interval is determined by its start TS key. + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData) { + STsdbCfg * pCfg = REPO_CFG(pTsdb); + STSmaData * curData = pData; + STSmaWriteH tSmaH = {0}; + + tsdbInitTSmaWriteH(&tSmaH, pTsdb, param, pData); + + if (pData->numOfBlocks <= 0 || pData->numOfColIds <= 0 || pData->dataLen <= 0) { + TASSERT(0); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + // Step 1: Judge the storage level + int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); + int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; + + // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file + // - Set and open the DFile or the B+Tree file + + int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, daysPerFile, pCfg->precision)); + int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, daysPerFile, pCfg->precision)); + + if (minFid == maxFid) { + // Save all the TSma data to one file + // TODO: tsdbStartTSmaCommit(); + tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, minFid); + tsdbInsertTSmaDataSection(&tSmaH, pData, 0, pData->numOfBlocks); + // TODO:tsdbEndTSmaCommit(); + } else if (minFid < maxFid) { + // Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files + // actually. + // TODO: tsdbStartTSmaCommit(); + int32_t tFid = minFid; + int32_t nOffset = 0; + int32_t nSmaBlocks = 0; + do { + tsdbTSmaDataSplit(&tSmaH, param, pData, daysPerFile, nOffset, tFid, &nSmaBlocks); + tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid); + if (tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, nSmaBlocks) < 0) { + return terrno; + } + + ++tFid; + nOffset += nSmaBlocks; + + if (tFid == maxFid) { + tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid); + tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, pData->numOfBlocks - nOffset); + break; + } + } while (true); + + // TODO:tsdbEndTSmaCommit(); + } else { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, SRSma *param, STSmaData *pData, int32_t fid) { + // TODO + pSmaH->pDFile = "rSma_interval_file_name"; + + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData) { + STsdbCfg * pCfg = REPO_CFG(pTsdb); + STSma * tParam = ¶m->tsma; + STSmaData * curData = pData; + STSmaWriteH tSmaH = {0}; + + tsdbInitTSmaWriteH(&tSmaH, pTsdb, tParam, pData); + + int32_t nSmaBlocks = pData->numOfBlocks; + int32_t colDataLen = pData->dataLen / nSmaBlocks; + + // Step 2.2: Storage of SMA_STORAGE_LEVEL_DFILESET + // TODO: Use the daysPerFile for rSma data, not for TS data. + // TODO: The lifecycle of rSma data should be processed like the TS data files. + int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, pCfg->daysPerFile, pCfg->precision)); + int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, pCfg->daysPerFile, pCfg->precision)); + + if (minFid == maxFid) { + // Save all the TSma data to one file + tsdbSetRSmaDataFile(&tSmaH, param, pData, minFid); + // TODO: tsdbStartTSmaCommit(); + tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks); + // TODO:tsdbEndTSmaCommit(); + } else if (minFid < maxFid) { + // Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files + // actually. + // TODO: tsdbStartTSmaCommit(); + int32_t tmpFid = 0; + int32_t step = 0; + for (int32_t n = 0; n < pData->numOfBlocks; ++n) { + } + tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks); + // TODO:tsdbEndTSmaCommit(); + } else { + TASSERT(0); + return TSDB_CODE_INVALID_PARA; + } + // Step 4: finish + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Init of tSma ReadH + * + * @param pSmaH + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { + pSmaH->pTsdb = pTsdb; + pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); +} + +/** + * @brief Init of tSma FS + * + * @param pReadH + * @param param + * @param queryWin + * @return int32_t + */ +static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { + int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); + int32_t daysPerFile = + storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; + pReadH->storageLevel = storageLevel; + pReadH->days = daysPerFile; + pReadH->smaFsIter.iter = 0; +} + +/** + * @brief Set and open tSma file if it has key locates in queryWin. + * + * @param pReadH + * @param param + * @param queryWin + * @return true + * @return false + */ +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { + SArray *smaFs = pReadH->pTsdb->fs->cstatus->smaf; + int32_t nSmaFs = taosArrayGetSize(smaFs); + + pReadH->pDFile = NULL; + + while (pReadH->smaFsIter.iter < nSmaFs) { + void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter); + if (pSmaFile) { // match(indexName, queryWindow) + // TODO: select the file by index_name ... + pReadH->pDFile = pSmaFile; + ++pReadH->smaFsIter.iter; + break; + } + ++pReadH->smaFsIter.iter; + } + + if (pReadH->pDFile != NULL) { + tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]"); + return true; + } + + return false; +} + +/** + * @brief Return the data between queryWin and fill the pData. + * + * @param pTsdb + * @param param + * @param pData + * @param queryWin + * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. + * @return int32_t + */ +int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult) { + STSmaReadH tReadH = {0}; + tsdbInitTSmaReadH(&tReadH, pTsdb, param, pData); + + tsdbInitTSmaFile(&tReadH, param, queryWin); + + int32_t nResult = 0; + int64_t lastKey = 0; + + while (true) { + if (nResult >= nMaxResult) { + break; + } + + // set and open the file according to the STSma param + if (tsdbSetAndOpenTSmaFile(&tReadH, param, queryWin)) { + char bTree[100] = "\0"; + while (strncmp(bTree, "has more nodes", 100) == 0) { + if (nResult >= nMaxResult) { + break; + } + // tsdbGetDataFromBTree(bTree, queryWin, lastKey) + // fill the pData + ++nResult; + } + } + } + + // read data from file and fill the result + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Get the start TS key of the last data block of one interval/sliding. + * + * @param pTsdb + * @param param + * @param result + * @return int32_t + * 1) Return 0 and fill the result if the check procedure is normal; + * 2) Return -1 if error occurs during the check procedure. + */ +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result) { + const char *procedure = ""; + if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) { + return -1; + } + // fill the result + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Remove the tSma data files related to param between pWin. + * + * @param pTsdb + * @param param + * @param pWin + * @return int32_t + */ +int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin) { + // for ("tSmaFiles of param-interval-sliding between pWin") { + // // remove the tSmaFile + // } + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 78067f8f83..ba8eea809e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -15,6 +15,14 @@ #include "tsdbDef.h" +/** + * @brief insert TS data + * + * @param pTsdb + * @param pMsg + * @param pRsp + * @return int + */ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { // Check if mem is there. If not, create one. if (pTsdb->mem == NULL) { @@ -24,4 +32,37 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { } } return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL); +} + +/** + * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + * TODO: Who is responsible for resource allocate and release? + */ +int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertTSmaDataImpl(pTsdb, param, pData)) < 0) { + tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + +/** + * @brief Insert Time-range-wise Rollup Sma(RSma) data + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertRSmaDataImpl(pTsdb, param, pData)) < 0) { + tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index a2616307ff..f3f21dc9c0 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -132,6 +132,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { } } break; + case TDMT_VND_CREATE_SMA: { // timeRangeSMA + // 1. tdCreateSmaMeta(pVnode->pMeta,...); + // 2. tdCreateSmaDataInit(); + // 3. tdCreateSmaData + } break; + case TDMT_VND_CANCEL_SMA: { // timeRangeSMA + } break; + case TDMT_VND_DROP_SMA: { // timeRangeSMA + } break; default: ASSERT(0); break; diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 986986aa70..97157fc49c 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -95,7 +95,7 @@ TEST(testCase, tSmaEncodeDecodeTest) { // resource release tdDestroyTSma(&tSma, false); - tdDestroyTSmaWrapper(&dstTSmaWrapper); + tdDestroyTSmaWrapper(&dstTSmaWrapper, false); } TEST(testCase, tSma_DB_Put_Get_Del_Test) { @@ -161,7 +161,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { EXPECT_EQ(qSmaCfg->interval, tSma.interval); tdDestroyTSma(qSmaCfg, true); - // get value by table uid + // get index name by table uid SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid); assert(pSmaCur != NULL); uint32_t indexCnt = 0; @@ -176,6 +176,15 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { EXPECT_EQ(indexCnt, 2); metaCloseSmaCurosr(pSmaCur); + // get wrapper by table uid + STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid); + assert(pSW != NULL); + EXPECT_EQ(pSW->number, 2); + EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1); + EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid); + EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2); + EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid); + // resource release metaRemoveSmaFromDb(pMeta, smaIndexName1); metaRemoveSmaFromDb(pMeta, smaIndexName2); @@ -197,15 +206,15 @@ TEST(testCase, tSmaInsertTest) { int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t); int32_t numOfColIds = 3; - int32_t numOfSmaBlocks = 10; + int32_t numOfBlocks = 10; - int32_t dataLen = numOfColIds * numOfSmaBlocks * blockSize; + int32_t dataLen = numOfColIds * numOfBlocks * blockSize; pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen); ASSERT_EQ(pSmaData != NULL, true); pSmaData->tableUid = 3232329230; pSmaData->numOfColIds = numOfColIds; - pSmaData->numOfSmaBlocks = numOfSmaBlocks; + pSmaData->numOfBlocks = numOfBlocks; pSmaData->dataLen = dataLen; pSmaData->tsWindow.skey = 1640000000; pSmaData->tsWindow.ekey = 1645788649; From 2c12354c2c064d8bbc551c72a260fe11ac5026c8 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Mon, 7 Mar 2022 02:07:23 +0800 Subject: [PATCH 2/6] [TD-13760]: libuv replace socket. --- include/os/osFile.h | 1 - include/os/osSocket.h | 15 +++++- source/libs/transport/inc/transComm.h | 2 - source/libs/transport/inc/transportInt.h | 2 - source/libs/transport/src/rpcTcp.c | 2 + source/libs/transport/src/rpcUdp.c | 3 ++ source/libs/transport/src/transSrv.c | 2 +- source/os/src/osFile.c | 45 +++++++++++------ source/os/src/osSocket.c | 46 +++++++++++++---- source/util/CMakeLists.txt | 6 +++ source/util/src/thttp.c | 63 ++++++++++++++++++++++++ 11 files changed, 156 insertions(+), 31 deletions(-) diff --git a/include/os/osFile.h b/include/os/osFile.h index 6ddf1e33c6..703ba196ef 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -87,7 +87,6 @@ int32_t taosRemoveFile(const char *path); void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); -int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size); int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size); void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length); diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 395874a88c..cbecb380e2 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -16,6 +16,14 @@ #ifndef _TD_OS_SOCKET_H_ #define _TD_OS_SOCKET_H_ +// If the error is in a third-party library, place this header file under the third-party library header file. +#ifndef ALLOW_FORBID_FUNC + #define socket SOCKET_FUNC_TAOS_FORBID + #define bind BIND_FUNC_TAOS_FORBID + #define listen LISTEN_FUNC_TAOS_FORBID + // #define accept ACCEPT_FUNC_TAOS_FORBID +#endif + #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #include "winsock2.h" #include @@ -30,6 +38,8 @@ extern "C" { #endif +#ifndef USE_UV + #define TAOS_EPOLL_WAIT_TIME 500 typedef int32_t SOCKET; typedef SOCKET EpollFd; @@ -50,7 +60,6 @@ void taosShutDownSocketRD(SOCKET fd); void taosShutDownSocketWR(SOCKET fd); int32_t taosSetNonblocking(SOCKET sock, int32_t on); void taosIgnSIGPIPE(); -void taosBlockSIGPIPE(); void taosSetMaskSIGPIPE(); int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen); int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen); @@ -86,6 +95,10 @@ uint32_t taosGetIpv4FromFqdn(const char *); void tinet_ntoa(char *ipstr, uint32_t ip); uint32_t ip2uint(const char *const ip_addr); +#endif + +void taosBlockSIGPIPE(); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d4d9bff56c..985d2f2f2f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -20,8 +20,6 @@ #include "rpcCache.h" #include "rpcHead.h" #include "rpcLog.h" -#include "rpcTcp.h" -#include "rpcUdp.h" #include "taoserror.h" #include "tglobal.h" #include "thash.h" diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index d080db753d..73137487eb 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -24,8 +24,6 @@ #include "rpcCache.h" #include "rpcHead.h" #include "rpcLog.h" -#include "rpcTcp.h" -#include "rpcUdp.h" #include "taoserror.h" #include "tglobal.h" #include "thash.h" diff --git a/source/libs/transport/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c index 56dd8cce25..d95ac3d36d 100644 --- a/source/libs/transport/src/rpcTcp.c +++ b/source/libs/transport/src/rpcTcp.c @@ -21,6 +21,7 @@ #include "taoserror.h" #include "tutil.h" +#ifndef USE_UV typedef struct SFdObj { void * signature; SOCKET fd; // TCP socket FD @@ -659,3 +660,4 @@ static void taosFreeFdObj(SFdObj *pFdObj) { tfree(pFdObj); } +#endif \ No newline at end of file diff --git a/source/libs/transport/src/rpcUdp.c b/source/libs/transport/src/rpcUdp.c index b57cf57c55..3640414a4c 100644 --- a/source/libs/transport/src/rpcUdp.c +++ b/source/libs/transport/src/rpcUdp.c @@ -22,6 +22,8 @@ #include "ttimer.h" #include "tutil.h" +#ifndef USE_UV + #define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_PKTS 1000 #define RPC_UDP_BUF_TIME 5 // mseconds @@ -257,3 +259,4 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c return ret; } +#endif \ No newline at end of file diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c7b6ca2a2c..ce78d83bdf 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -56,7 +56,7 @@ typedef struct SSrvMsg { typedef struct SWorkThrdObj { pthread_t thread; uv_pipe_t* pipe; - int fd; + uv_os_fd_t fd; uv_loop_t* loop; SAsyncPool* asyncPool; // uv_async_t* workerAsync; // diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 652e0b5182..acafbf6da8 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -667,17 +667,43 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) { #else -int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) { - if (pFileSrc == NULL) { +// int64_t taosSendFile(int fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) { +// if (pFileSrc == NULL) { +// return 0; +// } +// assert(pFileSrc->fd >= 0); + +// int64_t leftbytes = size; +// int64_t sentbytes; + +// while (leftbytes > 0) { +// sentbytes = sendfile(fdDst, pFileSrc->fd, offset, leftbytes); +// if (sentbytes == -1) { +// if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { +// continue; +// } else { +// return -1; +// } +// } else if (sentbytes == 0) { +// return (int64_t)(size - leftbytes); +// } + +// leftbytes -= sentbytes; +// } + +// return size; +// } + +int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) { + if (pFileOut == NULL || pFileIn == NULL) { return 0; } - assert(pFileSrc->fd >= 0); - + assert(pFileIn->fd >= 0 && pFileOut->fd >= 0); int64_t leftbytes = size; int64_t sentbytes; while (leftbytes > 0) { - sentbytes = sendfile(fdDst, pFileSrc->fd, offset, leftbytes); + sentbytes = sendfile(pFileOut->fd, pFileIn->fd, offset, leftbytes); if (sentbytes == -1) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { continue; @@ -694,15 +720,6 @@ int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_ return size; } -int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) { - if (pFileOut == NULL || pFileIn == NULL) { - return 0; - } - assert(pFileOut->fd >= 0); - - return taosSendFile(pFileOut->fd, pFileIn, offset, size); -} - #endif void taosFprintfFile(TdFilePtr pFile, const char *format, ...) { diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 07d30276b7..f27ad3a1e0 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -34,6 +34,24 @@ #include #endif +#ifndef USE_UV + +// typedef struct TdSocketServer { +// #if SOCKET_WITH_LOCK +// pthread_rwlock_t rwlock; +// #endif +// int refId; +// SocketFd fd; +// } * TdSocketServerPtr, TdSocketServer; + +// typedef struct TdSocketConnector { +// #if SOCKET_WITH_LOCK +// pthread_rwlock_t rwlock; +// #endif +// int refId; +// SocketFd fd; +// } * TdSocketConnectorPtr, TdSocketConnector; + #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags) @@ -115,15 +133,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { void taosIgnSIGPIPE() { signal(SIGPIPE, SIG_IGN); } -void taosBlockSIGPIPE() { - sigset_t signal_mask; - sigemptyset(&signal_mask); - sigaddset(&signal_mask, SIGPIPE); - int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); - if (rc != 0) { - //printf("failed to block SIGPIPE"); - } -} void taosSetMaskSIGPIPE() { sigset_t signal_mask; @@ -215,7 +224,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { } void taosIgnSIGPIPE() {} -void taosBlockSIGPIPE() {} void taosSetMaskSIGPIPE() {} int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { @@ -786,3 +794,21 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) { return len; } + +#endif + + + +#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) +void taosBlockSIGPIPE() { + sigset_t signal_mask; + sigemptyset(&signal_mask); + sigaddset(&signal_mask, SIGPIPE); + int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); + if (rc != 0) { + //printf("failed to block SIGPIPE"); + } +} +#else +void taosBlockSIGPIPE() {} +#endif \ No newline at end of file diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 7a47639e75..6effdff712 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -12,6 +12,12 @@ target_link_libraries( PUBLIC lz4_static PUBLIC api cjson ) +if(${BUILD_WITH_UV}) + target_link_libraries( + util + PUBLIC uv_a + ) +endif(${BUILD_TEST}) if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) diff --git a/source/util/src/thttp.c b/source/util/src/thttp.c index 0737f67ed1..593f3c43c2 100644 --- a/source/util/src/thttp.c +++ b/source/util/src/thttp.c @@ -18,6 +18,67 @@ #include "taoserror.h" #include "tlog.h" +#ifdef USE_UV + +#include + +void clientConnCb(uv_connect_t* req, int status) { + if(status < 0) { + terrno = TAOS_SYSTEM_ERROR(status); + uError("Connection error %s\n",uv_strerror(status)); + return; + } + + // impl later + uv_buf_t* wb = req->data; + if (wb == NULL) { + uv_close((uv_handle_t *)req->handle,NULL); + } + uv_write_t write_req; + uv_write(&write_req, req->handle, wb, 2, NULL); + uv_close((uv_handle_t *)req->handle,NULL); +} + +int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { + uint32_t ipv4 = taosGetIpv4FromFqdn(server); + if (ipv4 == 0xffffffff) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to get http server:%s ip since %s", server, terrstr()); + return -1; + // goto SEND_OVER; + } + char ipv4Buf[128]; + tinet_ntoa(ipv4Buf, ipv4); + + struct sockaddr_in dest; + uv_ip4_addr(ipv4Buf, port, &dest); + + uv_tcp_t socket_tcp; + uv_loop_t *loop = uv_default_loop(); + uv_tcp_init(loop, &socket_tcp); + uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); + + char header[4096] = {0}; + int32_t headLen = snprintf(header, sizeof(header), + "POST /report HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + server, contLen); + uv_buf_t wb[2]; + wb[0] = uv_buf_init((char*)header, headLen); + wb[1] = uv_buf_init((char*)pCont, contLen); + + connect->data = wb; + uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb); + terrno = 0; + uv_run(loop,UV_RUN_DEFAULT); + uv_loop_close(loop); + free(connect); + return terrno; +} + +#else int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { int32_t code = -1; SOCKET fd = 0; @@ -73,3 +134,5 @@ SEND_OVER: return code; } + +#endif \ No newline at end of file From 744f6768930ab8438b3761f7ca69cc3f560d402f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Mar 2022 12:27:09 +0800 Subject: [PATCH 3/6] test --- source/libs/index/test/indexTests.cc | 45 +++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 3f46a042ae..ce3f7fe25e 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -1058,6 +1058,45 @@ TEST_F(IndexEnv2, testIndex_read_performance4) { std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; assert(3 == index->SearchOne("tag10", "Hello")); } +TEST_F(IndexEnv2, testIndex_cache_del) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + for (int i = 0; i < 100; i++) { + index->PutOneTarge("tag10", "Hello", i); + } + index->Del("tag10", "Hello", 12); + index->Del("tag10", "Hello", 11); + + // index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000); + index->Del("tag10", "Hello", 17); + EXPECT_EQ(97, index->SearchOne("tag10", "Hello")); + + index->PutOneTarge("tag10", "Hello", 17); // add again + EXPECT_EQ(98, index->SearchOne("tag10", "Hello")); + + // del all + for (int i = 0; i < 200; i++) { + index->Del("tag10", "Hello", i); + } + EXPECT_EQ(0, index->SearchOne("tag10", "Hello")); + + // add other item + for (int i = 0; i < 2000; i++) { + index->PutOneTarge("tag10", "World", i); + } + + for (int i = 0; i < 2000; i++) { + index->PutOneTarge("tag10", "Hello", i); + } + EXPECT_EQ(2000, index->SearchOne("tag10", "Hello")); + + for (int i = 0; i < 2000; i++) { + index->Del("tag10", "Hello", i); + } + EXPECT_EQ(0, index->SearchOne("tag10", "Hello")); +} + TEST_F(IndexEnv2, testIndex_del) { std::string path = "/tmp/cache_and_tfile"; if (index->Init(path) != 0) { @@ -1069,8 +1108,6 @@ TEST_F(IndexEnv2, testIndex_del) { index->Del("tag10", "Hello", 11); index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000); - - EXPECT_EQ(98, index->SearchOne("tag10", "Hello")); - // std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; - // assert(3 == index->SearchOne("tag10", "Hello")); + index->Del("tag10", "Hello", 17); + EXPECT_EQ(97, index->SearchOne("tag10", "Hello")); } From e6a6887864e027fde2bcbff2b5c9d3b5304276f1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Mar 2022 13:40:14 +0800 Subject: [PATCH 4/6] minor changes --- include/os/osSocket.h | 4 ++++ source/util/src/thttp.c | 15 ++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/include/os/osSocket.h b/include/os/osSocket.h index cbecb380e2..57baabef03 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -34,6 +34,10 @@ #include #endif +#ifdef USE_UV + #include +#endif + #ifdef __cplusplus extern "C" { #endif diff --git a/source/util/src/thttp.c b/source/util/src/thttp.c index 593f3c43c2..b8d73478ea 100644 --- a/source/util/src/thttp.c +++ b/source/util/src/thttp.c @@ -19,10 +19,7 @@ #include "tlog.h" #ifdef USE_UV - -#include - -void clientConnCb(uv_connect_t* req, int status) { +static void clientConnCb(uv_connect_t* req, int32_t status) { if(status < 0) { terrno = TAOS_SYSTEM_ERROR(status); uError("Connection error %s\n",uv_strerror(status)); @@ -45,20 +42,20 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to get http server:%s ip since %s", server, terrstr()); return -1; - // goto SEND_OVER; } - char ipv4Buf[128]; + + char ipv4Buf[128] = {0}; tinet_ntoa(ipv4Buf, ipv4); - struct sockaddr_in dest; + struct sockaddr_in dest = {0}; uv_ip4_addr(ipv4Buf, port, &dest); - uv_tcp_t socket_tcp; + uv_tcp_t socket_tcp = {0}; uv_loop_t *loop = uv_default_loop(); uv_tcp_init(loop, &socket_tcp); uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); - char header[4096] = {0}; + char header[1024] = {0}; int32_t headLen = snprintf(header, sizeof(header), "POST /report HTTP/1.1\n" "Host: %s\n" From bef117d7db3485ea146eead2f55e01a1492d2f37 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Mar 2022 13:52:49 +0800 Subject: [PATCH 5/6] enable monitor gzip --- include/common/tglobal.h | 1 + include/libs/monitor/monitor.h | 1 + include/util/thttp.h | 4 ++- source/common/src/tglobal.c | 3 ++ source/dnode/mgmt/impl/src/dndEnv.c | 2 +- source/dnode/mnode/impl/src/mndTelem.c | 2 +- source/libs/monitor/inc/monInt.h | 1 + source/libs/monitor/src/monitor.c | 3 +- source/util/src/thttp.c | 49 ++++++++++++++++---------- 9 files changed, 44 insertions(+), 22 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2261170e63..ba41cb0292 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -58,6 +58,7 @@ extern int32_t tsMonitorInterval; extern char tsMonitorFqdn[]; extern uint16_t tsMonitorPort; extern int32_t tsMonitorMaxLogs; +extern bool tsMonitorComp; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 0c832f802b..a3e049b00c 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -130,6 +130,7 @@ typedef struct { const char *server; uint16_t port; int32_t maxLogs; + bool comp; } SMonCfg; int32_t monInit(const SMonCfg *pCfg); diff --git a/include/util/thttp.h b/include/util/thttp.h index f211b2615d..31d84850e0 100644 --- a/include/util/thttp.h +++ b/include/util/thttp.h @@ -22,7 +22,9 @@ extern "C" { #endif -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen); +typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; + +int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag); #ifdef __cplusplus } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a521e10cc7..f62f8e5d7a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -52,6 +52,7 @@ int32_t tsMonitorInterval = 5; char tsMonitorFqdn[TSDB_FQDN_LEN] = {0}; uint16_t tsMonitorPort = 6043; int32_t tsMonitorMaxLogs = 100; +bool tsMonitorComp = false; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -346,6 +347,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, 0) != 0) return -1; + if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, 0) != 0) return -1; return 0; } @@ -462,6 +464,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN); tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32; tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32; + tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval; if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; diff --git a/source/dnode/mgmt/impl/src/dndEnv.c b/source/dnode/mgmt/impl/src/dndEnv.c index 1cbdf0c1dc..84b2dca326 100644 --- a/source/dnode/mgmt/impl/src/dndEnv.c +++ b/source/dnode/mgmt/impl/src/dndEnv.c @@ -298,7 +298,7 @@ int32_t dndInit() { return -1; } - SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn}; + SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn, .comp = tsMonitorComp}; if (monInit(&monCfg) != 0) { dError("failed to init monitor since %s", terrstr()); dndCleanup(); diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 0e2141a4d7..453535c6e7 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -87,7 +87,7 @@ static int32_t mndProcessTelemTimer(SMnodeMsg* pReq) { taosWLockLatch(&pMgmt->lock); char* pCont = mndBuildTelemetryReport(pMnode); if (pCont != NULL) { - taosSendHttpReport(TELEMETRY_SERVER, TELEMETRY_PORT, pCont, strlen(pCont)); + taosSendHttpReport(TELEMETRY_SERVER, TELEMETRY_PORT, pCont, strlen(pCont), HTTP_FLAT); free(pCont); } taosWUnLockLatch(&pMgmt->lock); diff --git a/source/libs/monitor/inc/monInt.h b/source/libs/monitor/inc/monInt.h index bfb73af034..c3b6569555 100644 --- a/source/libs/monitor/inc/monInt.h +++ b/source/libs/monitor/inc/monInt.h @@ -54,6 +54,7 @@ typedef struct { int32_t maxLogs; const char *server; uint16_t port; + bool comp; SMonState state; } SMonitor; diff --git a/source/libs/monitor/src/monitor.c b/source/libs/monitor/src/monitor.c index 354989a7a1..905ff8d6da 100644 --- a/source/libs/monitor/src/monitor.c +++ b/source/libs/monitor/src/monitor.c @@ -45,6 +45,7 @@ int32_t monInit(const SMonCfg *pCfg) { tsMonitor.maxLogs = pCfg->maxLogs; tsMonitor.server = pCfg->server; tsMonitor.port = pCfg->port; + tsMonitor.comp = pCfg->comp; tsLogFp = monRecordLog; tsMonitor.state.time = taosGetTimestampMs(); pthread_mutex_init(&tsMonitor.lock, NULL); @@ -375,7 +376,7 @@ void monSendReport(SMonInfo *pMonitor) { char *pCont = tjsonToString(pMonitor->pJson); if (pCont != NULL) { - taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont)); + taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), tsMonitor.comp); free(pCont); } } diff --git a/source/util/src/thttp.c b/source/util/src/thttp.c index b8d73478ea..c8a52a4735 100644 --- a/source/util/src/thttp.c +++ b/source/util/src/thttp.c @@ -18,6 +18,28 @@ #include "taoserror.h" #include "tlog.h" +static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, + EHttpCompFlag flag) { + if (flag == HTTP_FLAT) { + return snprintf(pHead, headLen, + "POST /report HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + server, contLen); + } else if (flag == HTTP_GZIP) { + return snprintf(pHead, headLen, + "POST /report HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Encoding: gzip\n" + "Content-Length: %d\n\n", + server, contLen); + } else { + return -1; + } +} + #ifdef USE_UV static void clientConnCb(uv_connect_t* req, int32_t status) { if(status < 0) { @@ -36,7 +58,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { uv_close((uv_handle_t *)req->handle,NULL); } -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { +int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) { uint32_t ipv4 = taosGetIpv4FromFqdn(server); if (ipv4 == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -50,18 +72,14 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, struct sockaddr_in dest = {0}; uv_ip4_addr(ipv4Buf, port, &dest); - uv_tcp_t socket_tcp = {0}; - uv_loop_t *loop = uv_default_loop(); + uv_tcp_t socket_tcp = {0}; + uv_loop_t* loop = uv_default_loop(); uv_tcp_init(loop, &socket_tcp); uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); char header[1024] = {0}; - int32_t headLen = snprintf(header, sizeof(header), - "POST /report HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Length: %d\n\n", - server, contLen); + int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); + uv_buf_t wb[2]; wb[0] = uv_buf_init((char*)header, headLen); wb[1] = uv_buf_init((char*)pCont, contLen); @@ -76,7 +94,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, } #else -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { +int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) { int32_t code = -1; SOCKET fd = 0; @@ -94,15 +112,10 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, goto SEND_OVER; } - char header[4096] = {0}; - int32_t headLen = snprintf(header, sizeof(header), - "POST /report HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Length: %d\n\n", - server, contLen); + char header[1024] = {0}; + int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - if (taosWriteSocket(fd, (void*)header, headLen) < 0) { + if (taosWriteSocket(fd, header, headLen) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to send http header to %s:%u since %s", server, port, terrstr()); goto SEND_OVER; From 514f9f6626de43b4657b12733ae1d2d89a52a620 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Mar 2022 14:54:57 +0800 Subject: [PATCH 6/6] enable monitor gzip --- include/util/taoserror.h | 1 + include/util/thttp.h | 2 +- source/libs/monitor/src/monitor.c | 3 +- source/util/CMakeLists.txt | 2 +- source/util/src/terror.c | 1 + source/util/src/thttp.c | 101 +++++++++++++++++++++++++++--- 6 files changed, 100 insertions(+), 10 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5fe69284a2..6ad34eb0c0 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -84,6 +84,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x0120) #define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121) #define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122) +#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0123) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) diff --git a/include/util/thttp.h b/include/util/thttp.h index 31d84850e0..7d8c588bfc 100644 --- a/include/util/thttp.h +++ b/include/util/thttp.h @@ -24,7 +24,7 @@ extern "C" { typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag); +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); #ifdef __cplusplus } diff --git a/source/libs/monitor/src/monitor.c b/source/libs/monitor/src/monitor.c index 905ff8d6da..0760d7ae9e 100644 --- a/source/libs/monitor/src/monitor.c +++ b/source/libs/monitor/src/monitor.c @@ -376,7 +376,8 @@ void monSendReport(SMonInfo *pMonitor) { char *pCont = tjsonToString(pMonitor->pJson); if (pCont != NULL) { - taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), tsMonitor.comp); + EHttpCompFlag flag = tsMonitor.comp ? HTTP_GZIP : HTTP_FLAT; + taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), flag); free(pCont); } } diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 6effdff712..42950b2284 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -10,7 +10,7 @@ target_link_libraries( util PRIVATE os PUBLIC lz4_static - PUBLIC api cjson + PUBLIC api cjson zlib ) if(${BUILD_WITH_UV}) target_link_libraries( diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9c2fef5b47..6cf6eeb371 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -68,6 +68,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") +TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported") diff --git a/source/util/src/thttp.c b/source/util/src/thttp.c index c8a52a4735..9d5df20337 100644 --- a/source/util/src/thttp.c +++ b/source/util/src/thttp.c @@ -17,6 +17,7 @@ #include "thttp.h" #include "taoserror.h" #include "tlog.h" +#include "zlib.h" static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { @@ -40,22 +41,91 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH } } +int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { + int32_t code = -1; + int32_t destLen = srcLen; + void* pDest = malloc(destLen); + + if (pDest == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + z_stream gzipStream = {0}; + gzipStream.zalloc = (alloc_func)0; + gzipStream.zfree = (free_func)0; + gzipStream.opaque = (voidpf)0; + if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + gzipStream.next_in = (Bytef*)pSrc; + gzipStream.avail_in = (uLong)srcLen; + gzipStream.next_out = (Bytef*)pDest; + gzipStream.avail_out = (uLong)(destLen); + + while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) { + if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + } + + if (gzipStream.avail_in != 0) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + + int32_t err = 0; + while (1) { + if ((err = deflate(&gzipStream, Z_FINISH)) == Z_STREAM_END) { + break; + } + if (err != Z_OK) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + } + + if (deflateEnd(&gzipStream) != Z_OK) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + + if (gzipStream.total_out >= srcLen) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + + code = 0; + +_OVER: + if (code == 0) { + memcpy(pSrc, pDest, gzipStream.total_out); + code = gzipStream.total_out; + } + + free(pDest); + return code; +} + #ifdef USE_UV static void clientConnCb(uv_connect_t* req, int32_t status) { - if(status < 0) { + if (status < 0) { terrno = TAOS_SYSTEM_ERROR(status); - uError("Connection error %s\n",uv_strerror(status)); + uError("Connection error %s\n", uv_strerror(status)); return; } // impl later uv_buf_t* wb = req->data; if (wb == NULL) { - uv_close((uv_handle_t *)req->handle,NULL); + uv_close((uv_handle_t*)req->handle, NULL); } uv_write_t write_req; uv_write(&write_req, req->handle, wb, 2, NULL); - uv_close((uv_handle_t *)req->handle,NULL); + uv_close((uv_handle_t*)req->handle, NULL); } int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) { @@ -77,6 +147,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, uv_tcp_init(loop, &socket_tcp); uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); + if (flag == HTTP_GZIP) { + int32_t dstLen = taosCompressHttpRport(pCont, contLen); + if (dstLen > 0) { + contLen = dstLen; + } else { + flag = HTTP_FLAT; + } + } + char header[1024] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); @@ -87,14 +166,14 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, connect->data = wb; uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb); terrno = 0; - uv_run(loop,UV_RUN_DEFAULT); + uv_run(loop, UV_RUN_DEFAULT); uv_loop_close(loop); free(connect); return terrno; } #else -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) { +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { int32_t code = -1; SOCKET fd = 0; @@ -112,6 +191,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, goto SEND_OVER; } + if (flag == HTTP_GZIP) { + int32_t dstLen = taosCompressHttpRport(pCont, contLen); + if (dstLen > 0) { + contLen = dstLen; + } else { + flag = HTTP_FLAT; + } + } + char header[1024] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); @@ -134,7 +222,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, goto SEND_OVER; } - uTrace("send http to %s:%u, len:%d content: %s", server, port, contLen, pCont); code = 0; SEND_OVER: