diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 6a290dda15..e797911add 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -219,8 +219,8 @@ endif(${BUILD_WITH_NURAFT}) # pthread if(${BUILD_PTHREAD}) set(CMAKE_BUILD_TYPE release) - add_definitions(-DPTW32_STATIC_LIB) - add_subdirectory(pthread EXCLUDE_FROM_ALL) + add_definitions(-DPTW32_STATIC_LIB) + add_subdirectory(pthread) set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread) add_library(pthread STATIC IMPORTED GLOBAL) SET_PROPERTY(TARGET pthread PROPERTY IMPORTED_LOCATION ${LIBRARY_OUTPUT_PATH}/pthread.lib) diff --git a/include/client/taos.h b/include/client/taos.h index 0f1633ed6f..1ac92c61a5 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -121,13 +121,14 @@ typedef struct setConfRet { DLL_EXPORT void taos_cleanup(void); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT setConfRet taos_set_config(const char *config); +DLL_EXPORT int taos_init(void); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); -DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, +DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port); -DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); -DLL_EXPORT void taos_close(TAOS *taos); +DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); +DLL_EXPORT void taos_close(TAOS *taos); -const char *taos_data_type(int type); +const char *taos_data_type(int type); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8ea5cbbf34..da48846a8f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1521,8 +1521,8 @@ typedef struct { char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2 } SRSmaParam; -int tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam); -int tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam); +int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam); +int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam); typedef struct SVCreateStbReq { const char* name; @@ -1538,6 +1538,10 @@ int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq); typedef struct SVDropStbReq { // data +#ifdef WINDOWS + size_t avoidCompilationErrors; +#endif + } SVDropStbReq; typedef struct SVCreateStbRsp { @@ -2117,7 +2121,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapp if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; - pSW->pSchema = (SSchema*)TCODER_MALLOC(pDecoder, sizeof(SSchema) * pSW->nCols); + pSW->pSchema = (SSchema*)tCoderMalloc(pDecoder, sizeof(SSchema) * pSW->nCols); if (pSW->pSchema == NULL) return -1; for (int32_t i = 0; i < pSW->nCols; i++) { if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; diff --git a/include/common/trow.h b/include/common/trow.h index 8732497dbb..464e0dd69f 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -214,6 +214,14 @@ STSRow *tdRowDup(STSRow *row); static FORCE_INLINE SKvRowIdx *tdKvRowColIdxAt(STSRow *pRow, col_id_t idx) { return (SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx; } +static FORCE_INLINE int16_t tdKvRowColIdAt(STSRow *pRow, col_id_t idx) { + ASSERT(idx >= 0); + if (idx == 0) { + return PRIMARYKEY_TIMESTAMP_COL_ID; + } + + return ((SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx - 1)->colId; +} static FORCE_INLINE void *tdKVRowColVal(STSRow *pRow, SKvRowIdx *pIdx) { return POINTER_SHIFT(pRow, pIdx->offset); } #define TD_ROW_OFFSET(p) ((p)->toffset); // During ParseInsert when without STSchema, how to get the offset for STpRow? @@ -668,7 +676,7 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { case TD_ROW_KV: #ifdef TD_SUPPORT_BITMAP pBuilder->pBitmap = tdGetBitmapAddrKv(pBuilder->pBuf, pBuilder->nBoundCols); - memset(pBuilder->pBitmap, TD_VTYPE_NONE_BYTE_II, pBuilder->nBitmaps); + memset(pBuilder->pBitmap, TD_VTYPE_NONE_BYTE_II, pBuilder->nBoundBitmaps); #endif len = TD_ROW_HEAD_LEN + TD_ROW_NCOLS_LEN + (pBuilder->nBoundCols - 1) * sizeof(SKvRowIdx) + pBuilder->nBoundBitmaps; // add @@ -1092,7 +1100,7 @@ static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, STSRow *pRow = pIter->pRow; SKvRowIdx *pKvIdx = NULL; bool colFound = false; - col_id_t kvNCols = tdRowGetNCols(pRow); + col_id_t kvNCols = tdRowGetNCols(pRow) - 1; while (*nIdx < kvNCols) { pKvIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(pRow), *nIdx * sizeof(SKvRowIdx)); if (pKvIdx->colId == colId) { @@ -1108,7 +1116,14 @@ static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, } } - if (!colFound) return false; + if (!colFound) { + if(colId <= pIter->maxColId) { + pVal->valType = TD_VTYPE_NONE; + return true; + } else { + return false; + } + } #ifdef TD_SUPPORT_BITMAP int16_t colIdx = -1; diff --git a/include/os/os.h b/include/os/os.h index 86abcc15f5..329ad481aa 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -59,6 +59,7 @@ extern "C" { #include #endif +#define __typeof(a) auto #endif diff --git a/include/os/osMath.h b/include/os/osMath.h index cabb821844..829bbd847b 100644 --- a/include/os/osMath.h +++ b/include/os/osMath.h @@ -23,27 +23,21 @@ extern "C" { #define TPOW2(x) ((x) * (x)) #define TABS(x) ((x) > 0 ? (x) : -(x)) +#define TSWAP(a, b) \ + do { \ + __typeof(a) __tmp = (a); \ + (a) = (b); \ + (b) = __tmp; \ + } while (0) + #ifdef WINDOWS - #define TSWAP(a, b, c) \ - do { \ - c __tmp = (c)(a); \ - (a) = (c)(b); \ - (b) = __tmp; \ - } while (0) #define TMAX(a, b) (((a) > (b)) ? (a) : (b)) #define TMIN(a, b) (((a) < (b)) ? (a) : (b)) #define TRANGE(aa, bb, cc) ((aa) = TMAX((aa), (bb)),(aa) = TMIN((aa), (cc))) #else - #define TSWAP(a, b, c) \ - do { \ - __typeof(a) __tmp = (a); \ - (a) = (b); \ - (b) = __tmp; \ - } while (0) - #define TMAX(a, b) \ ({ \ __typeof(a) __a = (a); \ @@ -51,12 +45,12 @@ extern "C" { (__a > __b) ? __a : __b; \ }) - #define TMIN(a, b) \ - ({ \ - __typeof(a) __a = (a); \ - __typeof(b) __b = (b); \ - (__a < __b) ? __a : __b; \ - }) +#define TMIN(a, b) \ + ({ \ + __typeof(a) __a = (a); \ + __typeof(b) __b = (b); \ + (__a < __b) ? __a : __b; \ + }) #define TRANGE(a, b, c) \ ({ \ diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 187f036653..cb949468eb 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -62,6 +62,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014) #define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) #define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) +#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) //common & util #define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0100) diff --git a/include/util/tencode.h b/include/util/tencode.h index 0236fe58da..c5066996f3 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -79,31 +79,52 @@ typedef struct { #define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos) #define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE)) #define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE)) -#define TCODER_MALLOC(PCODER, SIZE) \ - ({ \ - void* ptr = NULL; \ - SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(*pMem) + (SIZE)); \ - if (pMem) { \ - pMem->next = (PCODER)->mList; \ - (PCODER)->mList = pMem; \ - ptr = (void*)&pMem[1]; \ - } \ - ptr; \ - }) +// #define TCODER_MALLOC(PCODER, SIZE) \ +// ({ \ +// void* ptr = NULL; \ +// SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(*pMem) + (SIZE)); \ +// if (pMem) { \ +// pMem->next = (PCODER)->mList; \ +// (PCODER)->mList = pMem; \ +// ptr = (void*)&pMem[1]; \ +// } \ +// ptr; \ +// }) +static FORCE_INLINE void* tCoderMalloc(SCoder* pCoder, int32_t size) { + void* ptr = NULL; + SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(SCoderMem*) + size); + if (pMem) { + pMem->next = pCoder->mList; + pCoder->mList = pMem; + ptr = (void*)&pMem[1]; + } + return ptr; +} -#define tEncodeSize(E, S, SIZE) \ - ({ \ +#define tEncodeSize(E, S, SIZE, RET) \ + do{ \ SCoder coder = {0}; \ - int ret = 0; \ tCoderInit(&coder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); \ if ((E)(&coder, S) == 0) { \ SIZE = coder.pos; \ } else { \ - ret = -1; \ + RET = -1; \ } \ tCoderClear(&coder); \ - ret; \ - }) + }while(0) +// #define tEncodeSize(E, S, SIZE) \ +// ({ \ +// SCoder coder = {0}; \ +// int ret = 0; \ +// tCoderInit(&coder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); \ +// if ((E)(&coder, S) == 0) { \ +// SIZE = coder.pos; \ +// } else { \ +// ret = -1; \ +// } \ +// tCoderClear(&coder); \ +// ret; \ +// }) void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type); void tCoderClear(SCoder* pCoder); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 41448a4391..0ff542358d 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -254,8 +254,6 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); -int taos_init(); - void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); void destroyTscObj(void* pObj); STscObj* acquireTscObj(int64_t rid); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f37ace050a..b876f9b65c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -188,8 +188,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision); } - TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*); - TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*); + TSWAP(pRequest->dbList, (*pQuery)->pDbList); + TSWAP(pRequest->tableList, (*pQuery)->pTableList); } return code; @@ -358,8 +358,15 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { SQuery* pQuery = NULL; int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); - if (TSDB_CODE_SUCCESS == code) { - code = parseSql(pRequest, false, &pQuery, NULL); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + code = parseSql(pRequest, false, &pQuery, NULL); + if (code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + return pRequest; } return launchQueryImpl(pRequest, pQuery, code, false); @@ -410,7 +417,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { while (retryNum++ < REQUEST_MAX_TRY_TIMES) { pRequest = launchQuery(pTscObj, sql, sqlLen); - if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { + if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { break; } diff --git a/source/client/src/taos.def b/source/client/src/taos.def index bc78b241d2..994dd75090 100644 --- a/source/client/src/taos.def +++ b/source/client/src/taos.def @@ -1,6 +1,7 @@ taos_cleanup taos_options taos_set_config +taos_init taos_connect taos_connect_l taos_connect_auth diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 63e009ed0a..f30a74bf11 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -225,12 +225,16 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co // Handle the bitmap char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); if (p == NULL) { - // TODO + return TSDB_CODE_OUT_OF_MEMORY; } pColumnInfoData->varmeta.offset = (int32_t*)p; for (int32_t i = 0; i < numOfRow2; ++i) { - pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length; + if (pSource->varmeta.offset[i] == -1) { + pColumnInfoData->varmeta.offset[i + numOfRow1] = -1; + } else { + pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length; + } } // copy data @@ -239,7 +243,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co if (pColumnInfoData->varmeta.allocLen < len + oldLen) { char* tmp = taosMemoryRealloc(pColumnInfoData->pData, len + oldLen); if (tmp == NULL) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } pColumnInfoData->pData = tmp; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6c84fe6be7..afab703e77 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3231,7 +3231,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) { if (tStartDecode(decoder) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1; - pReq->offsets = (SMqOffset *)TCODER_MALLOC(decoder, sizeof(SMqOffset) * pReq->num); + pReq->offsets = (SMqOffset *)tCoderMalloc(decoder, sizeof(SMqOffset) * pReq->num); if (pReq->offsets == NULL) return -1; for (int32_t i = 0; i < pReq->num; i++) { tDecodeSMqOffset(decoder, &pReq->offsets[i]); @@ -3514,7 +3514,7 @@ int tDecodeSVCreateTbBatchRsp(SCoder *pCoder, SVCreateTbBatchRsp *pRsp) { if (tStartDecode(pCoder) < 0) return -1; if (tDecodeI32v(pCoder, &pRsp->nRsps) < 0) return -1; - pRsp->pRsps = (SVCreateTbRsp *)TCODER_MALLOC(pCoder, sizeof(*pRsp->pRsps) * pRsp->nRsps); + pRsp->pRsps = (SVCreateTbRsp *)tCoderMalloc(pCoder, sizeof(*pRsp->pRsps) * pRsp->nRsps); for (int32_t i = 0; i < pRsp->nRsps; i++) { if (tDecodeSVCreateTbRsp(pCoder, pRsp->pRsps + i) < 0) return -1; } @@ -3621,6 +3621,43 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { taosMemoryFreeClear(pReq->ast); } +int32_t tEncodeSRSmaParam(SCoder *pCoder, const SRSmaParam *pRSmaParam) { + if (tEncodeFloat(pCoder, pRSmaParam->xFilesFactor) < 0) return -1; + if (tEncodeI32v(pCoder, pRSmaParam->delay) < 0) return -1; + if (tEncodeI32v(pCoder, pRSmaParam->qmsg1Len) < 0) return -1; + if (tEncodeI32v(pCoder, pRSmaParam->qmsg2Len) < 0) return -1; + if (pRSmaParam->qmsg1Len > 0) { + if (tEncodeBinary(pCoder, pRSmaParam->qmsg1, (uint64_t)pRSmaParam->qmsg1Len) < 0) // qmsg1Len contains len of '\0' + return -1; + } + if (pRSmaParam->qmsg2Len > 0) { + if (tEncodeBinary(pCoder, pRSmaParam->qmsg2, (uint64_t)pRSmaParam->qmsg2Len) < 0) // qmsg2Len contains len of '\0' + return -1; + } + + return 0; +} + +int32_t tDecodeSRSmaParam(SCoder *pCoder, SRSmaParam *pRSmaParam) { + if (tDecodeFloat(pCoder, &pRSmaParam->xFilesFactor) < 0) return -1; + if (tDecodeI32v(pCoder, &pRSmaParam->delay) < 0) return -1; + if (tDecodeI32v(pCoder, &pRSmaParam->qmsg1Len) < 0) return -1; + if (tDecodeI32v(pCoder, &pRSmaParam->qmsg2Len) < 0) return -1; + if (pRSmaParam->qmsg1Len > 0) { + uint64_t len; + if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg1, &len) < 0) return -1; // qmsg1Len contains len of '\0' + } else { + pRSmaParam->qmsg1 = NULL; + } + if (pRSmaParam->qmsg2Len > 0) { + uint64_t len; + if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg2, &len) < 0) return -1; // qmsg2Len contains len of '\0' + } else { + pRSmaParam->qmsg2 = NULL; + } + return 0; +} + int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) { if (tStartEncode(pCoder) < 0) return -1; @@ -3629,9 +3666,9 @@ int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) { if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; - // if (pReq->rollup) { - // if (tEncodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1; - // } + if (pReq->rollup) { + if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1; + } tEndEncode(pCoder); return 0; @@ -3645,9 +3682,9 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) { if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; - // if (pReq->rollup) { - // if (tDecodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1; - // } + if (pReq->rollup) { + if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1; + } tEndDecode(pCoder); return 0; @@ -3743,7 +3780,7 @@ int tDecodeSVCreateTbBatchReq(SCoder *pCoder, SVCreateTbBatchReq *pReq) { if (tStartDecode(pCoder) < 0) return -1; if (tDecodeI32v(pCoder, &pReq->nReqs) < 0) return -1; - pReq->pReqs = (SVCreateTbReq *)TCODER_MALLOC(pCoder, sizeof(SVCreateTbReq) * pReq->nReqs); + pReq->pReqs = (SVCreateTbReq *)tCoderMalloc(pCoder, sizeof(SVCreateTbReq) * pReq->nReqs); if (pReq->pReqs == NULL) return -1; for (int iReq = 0; iReq < pReq->nReqs; iReq++) { if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1; diff --git a/source/common/src/trow.c b/source/common/src/trow.c index c73f26e6da..7157c1e0f0 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -220,7 +220,7 @@ static uint8_t tdGetMergedBitmapByte(uint8_t byte) { } /** - * @brief Merge bitmap from 2 bits to 1 bits, and the memory buffer should be guaranteed by the invoker. + * @brief Merge bitmap from 2 bits to 1 bit, and the memory buffer should be guaranteed by the invoker. * * @param srcBitmap * @param nBits diff --git a/source/common/src/ttypes.c b/source/common/src/ttypes.c index 1c7a7986e9..e3d67cd488 100644 --- a/source/common/src/ttypes.c +++ b/source/common/src/ttypes.c @@ -651,35 +651,35 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void *buf switch (type) { case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_UINT: { - TSWAP(*(int32_t *)(pLeft), *(int32_t *)(pRight), int32_t); + TSWAP(*(int32_t *)(pLeft), *(int32_t *)(pRight)); break; } case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_TIMESTAMP: { - TSWAP(*(int64_t *)(pLeft), *(int64_t *)(pRight), int64_t); + TSWAP(*(int64_t *)(pLeft), *(int64_t *)(pRight)); break; } case TSDB_DATA_TYPE_DOUBLE: { - TSWAP(*(double *)(pLeft), *(double *)(pRight), double); + TSWAP(*(double *)(pLeft), *(double *)(pRight)); break; } case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_USMALLINT: { - TSWAP(*(int16_t *)(pLeft), *(int16_t *)(pRight), int16_t); + TSWAP(*(int16_t *)(pLeft), *(int16_t *)(pRight)); break; } case TSDB_DATA_TYPE_FLOAT: { - TSWAP(*(float *)(pLeft), *(float *)(pRight), float); + TSWAP(*(float *)(pLeft), *(float *)(pRight)); break; } case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_UTINYINT: { - TSWAP(*(int8_t *)(pLeft), *(int8_t *)(pRight), int8_t); + TSWAP(*(int8_t *)(pLeft), *(int8_t *)(pRight)); break; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 3b6b264d1b..41bcef25d1 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -395,7 +395,9 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt } } // get length - if (tEncodeSize(tEncodeSVCreateStbReq, &req, contLen) < 0) { + int32_t ret = 0; + tEncodeSize(tEncodeSVCreateStbReq, &req, contLen, ret); + if (ret < 0) { return NULL; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 261d334de2..f739810065 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -230,8 +230,8 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); pOld->updateTime = pNew->updateTime; - TSWAP(pOld->readDbs, pNew->readDbs, (void *)); - TSWAP(pOld->writeDbs, pNew->writeDbs, (void *)); + TSWAP(pOld->readDbs, pNew->readDbs); + TSWAP(pOld->writeDbs, pNew->writeDbs); return 0; } diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 9a0b1f4578..0001a43231 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -77,21 +77,25 @@ typedef struct { tb_uid_t uid; } STbDbKey; -typedef struct __attribute__((__packed__)) { +#pragma pack(push, 1) +typedef struct { tb_uid_t uid; int32_t sver; } SSkmDbKey; +#pragma pack(pop) typedef struct { tb_uid_t suid; tb_uid_t uid; } SCtbIdxKey; -typedef struct __attribute__((__packed__)) { +#pragma pack(push, 1) +typedef struct { tb_uid_t suid; int16_t cid; char data[]; } STagIdxKey; +#pragma pack(pop) typedef struct { int64_t dtime; diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 8fb18ddfea..162d733cc3 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -22,13 +22,13 @@ extern "C" { #endif -typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2); +// typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2); -struct STbDdlH { - void *ahandle; - void *result; - __tb_ddl_fn_t fp; -}; +// struct STbDdlH { +// void *ahandle; +// void *result; +// __tb_ddl_fn_t fp; +// }; static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { ASSERT(*pStore == NULL); @@ -40,14 +40,6 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { return TSDB_CODE_SUCCESS; } -int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); -void tsdbUidStoreDestory(STbUidStore *pStore); -void *tsdbUidStoreFree(STbUidStore *pStore); - -int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq); -int32_t tsdbFetchTbUidList(void *pTsdb, void **result, void *suid, void *uid); -int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore); -int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 884f86a3bb..b5066799ca 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -107,6 +107,15 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); +// sma + +int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq); +int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); +int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore); +void tsdbUidStoreDestory(STbUidStore* pStore); +void* tsdbUidStoreFree(STbUidStore* pStore); +int32_t tsdbTriggerRSma(STsdb* pTsdb, SMeta* pMeta, void* pMsg, int32_t inputType); + typedef struct { int8_t streamType; // sma or other int8_t dstType; @@ -162,7 +171,7 @@ struct STbUidStore { #define TD_VID(PVNODE) (PVNODE)->config.vgId -typedef struct STbDdlH STbDdlH; +// typedef struct STbDdlH STbDdlH; // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 7003b0d17a..3e70bbb479 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -158,7 +158,9 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { pKey = &tbDbKey; kLen = sizeof(tbDbKey); - if (tEncodeSize(metaEncodeEntry, pME, vLen) < 0) { + int32_t ret = 0; + tEncodeSize(metaEncodeEntry, pME, vLen, ret); + if (ret < 0) { goto _err; } @@ -250,7 +252,9 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { skmDbKey.sver = pSW->sver; // encode schema - if (tEncodeSize(tEncodeSSchemaWrapper, pSW, vLen) < 0) return -1; + int32_t ret = 0; + tEncodeSize(tEncodeSSchemaWrapper, pSW, vLen, ret); + if (ret < 0) return -1; pVal = taosMemoryMalloc(vLen); if (pVal == NULL) { rcode = -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 302ee89e51..638e1e3a1c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -454,7 +454,7 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) { if (emptyQueryTimewindow(pTsdbReadHandle)) { if (pCond->order != pTsdbReadHandle->order) { pTsdbReadHandle->order = pCond->order; - TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, int64_t); + TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey); } return; @@ -924,7 +924,7 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) { pHandle->cur.mixBlock = true; if (!ASCENDING_TRAVERSE(pHandle->order)) { - TSWAP(win->skey, win->ekey, TSKEY); + TSWAP(win->skey, win->ekey); } return true; @@ -1203,7 +1203,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* // update the last key value pCheckInfo->lastKey = cur->win.ekey + step; if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { - TSWAP(cur->win.skey, cur->win.ekey, TSKEY); + TSWAP(cur->win.skey, cur->win.ekey); } cur->mixBlock = true; @@ -1519,8 +1519,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit } else if (isRow1DataRow) { colIdOfRow1 = pSchema1->columns[j].colId; } else { - SKvRowIdx* pColIdx = tdKvRowColIdxAt(row1, j); - colIdOfRow1 = pColIdx->colId; + colIdOfRow1 = tdKvRowColIdAt(row1, j); } int32_t colIdOfRow2; @@ -1529,8 +1528,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit } else if (isRow2DataRow) { colIdOfRow2 = pSchema2->columns[k].colId; } else { - SKvRowIdx* pColIdx = tdKvRowColIdxAt(row2, k); - colIdOfRow2 = pColIdx->colId; + colIdOfRow2 = tdKvRowColIdAt(row2, k); } if (colIdOfRow1 == colIdOfRow2) { @@ -1701,7 +1699,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa int32_t end = endPos; if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { - TSWAP(start, end, int32_t); + TSWAP(start, end); } assert(pTsdbReadHandle->outputCapacity >= (end - start + 1)); @@ -1932,7 +1930,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))); if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { - TSWAP(cur->win.skey, cur->win.ekey, TSKEY); + TSWAP(cur->win.skey, cur->win.ekey); } moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols); diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index c9e33cefc7..1abca21d34 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -173,6 +173,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg); static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); +static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids); // mgmt interface static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); @@ -1692,18 +1693,16 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { * @param pReq * @return int32_t */ -int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { -#if 0 - SRSmaParam *param = pReq->stbCfg.pRSmaParam; - - if (!param) { - tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, - pReq->stbCfg.suid); +int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) { + if (!pReq->rollup) { + tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; } + SRSmaParam *param = &pReq->pRSmaParam; + if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) { - tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid); + tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; } @@ -1716,9 +1715,9 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; - pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t)); + pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); if (pRSmaInfo) { - tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid); + tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; } @@ -1758,14 +1757,13 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { } } - if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != + if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } else { - tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->stbCfg.suid); + tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->suid); } -#endif return TSDB_CODE_SUCCESS; } @@ -1777,7 +1775,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { * @param uid * @return int32_t */ -int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) { +static int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) { // prefer to store suid/uids in array if ((suid == pStore->suid) || (pStore->suid == 0)) { if (pStore->suid == 0) { @@ -1833,6 +1831,7 @@ void tsdbUidStoreDestory(STbUidStore *pStore) { if (pStore) { if (pStore->uidHash) { if (pStore->tbUids) { + // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys. void *pIter = taosHashIterate(pStore->uidHash, NULL); while (pIter) { SArray *arr = *(SArray **)pIter; @@ -1847,8 +1846,10 @@ void tsdbUidStoreDestory(STbUidStore *pStore) { } void *tsdbUidStoreFree(STbUidStore *pStore) { - tsdbUidStoreDestory(pStore); - taosMemoryFree(pStore); + if (pStore) { + tsdbUidStoreDestory(pStore); + taosMemoryFree(pStore); + } return NULL; } @@ -1861,7 +1862,7 @@ void *tsdbUidStoreFree(STbUidStore *pStore) { * @param uid * @return int32_t */ -int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { +int32_t tsdbFetchTbUidList(STsdb *pTsdb, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) { SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb); // only applicable to rollup SMA ctables @@ -1877,7 +1878,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { } // info cached when create rsma stable and return directly for non-rsma ctables - if (!taosHashGet(infoHash, suid, sizeof(tb_uid_t))) { + if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) { return TSDB_CODE_SUCCESS; } @@ -1887,7 +1888,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { } } - if (tsdbUidStorePut(*ppStore, *(tb_uid_t *)suid, (tb_uid_t *)uid) != 0) { + if (tsdbUidStorePut(*ppStore, suid, &uid) != 0) { *ppStore = tsdbUidStoreFree(*ppStore); return TSDB_CODE_FAILED; } @@ -1935,12 +1936,10 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) { if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb)); - tsdbUidStoreFree(pStore); return TSDB_CODE_SUCCESS; } if (tsdbUpdateTbUidListImpl(pTsdb, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) { - tsdbUidStoreFree(pStore); return TSDB_CODE_FAILED; } @@ -1951,15 +1950,11 @@ int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) { if (tsdbUpdateTbUidListImpl(pTsdb, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) { taosHashCancelIterate(pStore->uidHash, pIter); - tsdbUidStoreFree(pStore); return TSDB_CODE_FAILED; } pIter = taosHashIterate(pStore->uidHash, pIter); } - - tsdbUidStoreFree(pStore); - return TSDB_CODE_SUCCESS; } @@ -1971,8 +1966,6 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { STSRow *row = NULL; terrno = TSDB_CODE_SUCCESS; - // pMsg->length = htonl(pMsg->length); - // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1; while (true) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index de06f93c64..7f7023fccd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -300,13 +300,13 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, goto _err; } - // tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &vCreateTbReq); - if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) { pRsp->code = terrno; goto _err; } + tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req); + tCoderClear(&coder); return 0; @@ -323,6 +323,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, SVCreateTbBatchRsp rsp = {0}; SVCreateTbRsp cRsp = {0}; char tbName[TSDB_TABLE_FNAME_LEN]; + STbUidStore *pStore = NULL; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->code = TSDB_CODE_SUCCESS; @@ -361,6 +362,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, cRsp.code = terrno; } else { cRsp.code = TSDB_CODE_SUCCESS; + tsdbFetchTbUidList(pVnode->pTsdb, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); } taosArrayPush(rsp.pArray, &cRsp); @@ -368,8 +370,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, tCoderClear(&coder); + tsdbUpdateTbUidList(pVnode->pTsdb, pStore); + tsdbUidStoreFree(pStore); + // prepare rsp - tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen); + int32_t ret = 0; + tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret); pRsp->pCont = rpcMallocCont(pRsp->contLen); if (pRsp->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -425,7 +431,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in SSubmitRsp rsp = {0}; pRsp->code = 0; - + tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); // handle the request if (tsdbInsertData(pVnode->pTsdb, version, pSubmitReq, &rsp) < 0) { pRsp->code = terrno; @@ -434,7 +440,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in // pRsp->msgType = TDMT_VND_SUBMIT_RSP; // vnodeProcessSubmitReq(pVnode, ptr, pRsp); - // tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK); + // tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); // encode the response (TODO) pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f6b1839f68..f1995b723d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2100,7 +2100,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { // // pQueryAttr->order.order = TSDB_ORDER_ASC; // if (pQueryAttr->window.skey > pQueryAttr->window.ekey) { -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); // } // // pQueryAttr->needReverseScan = false; @@ -2110,7 +2110,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { // if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) { // pQueryAttr->order.order = TSDB_ORDER_ASC; // if (pQueryAttr->window.skey > pQueryAttr->window.ekey) { -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); // } // // pQueryAttr->needReverseScan = false; @@ -2135,7 +2135,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { // //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey, //// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); // -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); // doUpdateLastKey(pQueryAttr); // } // @@ -2146,7 +2146,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { // //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey, //// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); // -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); // doUpdateLastKey(pQueryAttr); // } // @@ -2162,7 +2162,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { //// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, /// pQueryAttr->window.skey); // -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); // doUpdateLastKey(pQueryAttr); // } // @@ -2174,7 +2174,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { //// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, /// pQueryAttr->window.skey); // -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); // doUpdateLastKey(pQueryAttr); // } // @@ -2673,7 +2673,7 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) return; } - // TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY); + // TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey); // pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; // SWITCH_ORDER(pTableQueryInfo->cur.order); @@ -4994,13 +4994,21 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) pProjectInfo->curOffset = 0; } - if (pRes->info.rows >= pOperator->resultInfo.threshold) { + // check for the limitation in each group + if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) { + pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); - // check for the limitation in each group - if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) { - pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); + if (pProjectInfo->slimit.limit == -1 || pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) { + pOperator->status = OP_EXEC_DONE; } + return PROJECT_RETRIEVE_DONE; + } + + // todo optimize performance + // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the + // they may not belong to the same group the limit/offset value is not valid in this case. + if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 || pProjectInfo->slimit.limit != -1) { return PROJECT_RETRIEVE_DONE; } else { // not full enough, continue to accumulate the output data in the buffer. return PROJECT_RETRIEVE_CONTINUE; @@ -6652,7 +6660,7 @@ static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableS //todo work around a problem, remove it later if ((pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey > pCond->twindow.ekey) || (pCond->order == TSDB_ORDER_DESC && pCond->twindow.skey < pCond->twindow.ekey)) { - TSWAP(pCond->twindow.skey, pCond->twindow.ekey, int64_t); + TSWAP(pCond->twindow.skey, pCond->twindow.ekey); } #endif diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 022ba9d872..8291826e69 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -266,7 +266,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction // setupQueryRangeForReverseScan(pTableScanInfo); STimeWindow* pTWindow = &pTableScanInfo->cond.twindow; - TSWAP(pTWindow->skey, pTWindow->ekey, int64_t); + TSWAP(pTWindow->skey, pTWindow->ekey); pTableScanInfo->cond.order = TSDB_ORDER_DESC; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index dfeb2df911..62bb96f1cb 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -307,7 +307,7 @@ static int32_t buildOutput(SInsertParseContext* pCxt) { taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); dst->numOfTables = src->numOfTables; dst->size = src->size; - TSWAP(dst->pData, src->pData, char*); + TSWAP(dst->pData, src->pData); buildMsgHeader(src, dst); taosArrayPush(pCxt->pOutput->pDataBlocks, &dst); } @@ -1069,7 +1069,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) { return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt"); - ; } destroyInsertParseContextForTable(pCxt); diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index aee4566fa0..ae05d2293e 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -159,9 +159,10 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) { SCoder coder = {0}; char* pBuf; - int32_t len; + int32_t len; - tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len); + int32_t ret = 0; + tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, ret); if (pBlocks->nAllocSize - pBlocks->size < len) { pBlocks->nAllocSize += len + pBlocks->rowSize; char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 96a5d8a291..8502703f4b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -540,8 +540,8 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); } if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) || - (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_VAR_DATA_TYPE(rdt.type)) || - (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_VAR_DATA_TYPE(ldt.type))) { + (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) || + (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); } @@ -1973,10 +1973,10 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch } strcpy(pTable->table.dbName, pInfo->pDbName); strcpy(pTable->table.tableName, pInfo->pTableName); - TSWAP(pTable->pMeta, pInfo->pRollupTableMeta, STableMeta*); + TSWAP(pTable->pMeta, pInfo->pRollupTableMeta); pSelect->pFromTable = (SNode*)pTable; - TSWAP(pSelect->pProjectionList, pInfo->pFuncs, SNodeList*); + TSWAP(pSelect->pProjectionList, pInfo->pFuncs); SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION); if (NULL == pSelect->pProjectionList || NULL == pFunc) { nodesDestroyNode(pSelect); @@ -1993,9 +1993,9 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch return TSDB_CODE_OUT_OF_MEMORY; } pSelect->pWindow = (SNode*)pInterval; - TSWAP(pInterval->pInterval, pInfo->pInterval, SNode*); - TSWAP(pInterval->pOffset, pInfo->pOffset, SNode*); - TSWAP(pInterval->pSliding, pInfo->pSliding, SNode*); + TSWAP(pInterval->pInterval, pInfo->pInterval); + TSWAP(pInterval->pOffset, pInfo->pOffset); + TSWAP(pInterval->pSliding, pInfo->pSliding); pInterval->pCol = nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pInterval->pCol) { nodesDestroyNode(pSelect); @@ -3184,7 +3184,8 @@ static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* int tlen; SCoder coder = {0}; - tEncodeSize(tEncodeSVCreateTbBatchReq, &pTbBatch->req, tlen); + int32_t ret = 0; + tEncodeSize(tEncodeSVCreateTbBatchReq, &pTbBatch->req, tlen, ret); tlen += sizeof(SMsgHead); //+ tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req)); void* buf = taosMemoryMalloc(tlen); if (NULL == buf) { @@ -3598,7 +3599,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { default: pQuery->execMode = QUERY_EXEC_MODE_RPC; if (NULL != pCxt->pCmdMsg) { - TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg, SCmdMsgInfo*); + TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg); pQuery->msgType = pQuery->pCmdMsg->msgType; } break; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 450fc307ea..18e59859ac 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -251,8 +251,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return TSDB_CODE_OUT_OF_MEMORY; } - TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*); - TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*); + TSWAP(pScan->pMeta, pRealTable->pMeta); + TSWAP(pScan->pVgroupList, pRealTable->pVgroupList); pScan->scanSeq[0] = 1; pScan->scanSeq[1] = 0; pScan->scanRange = TSWINDOW_INITIALIZER; @@ -954,7 +954,7 @@ static int32_t createVnodeModifLogicNode(SLogicPlanContext* pCxt, SVnodeModifOpS if (NULL == pModif) { return TSDB_CODE_OUT_OF_MEMORY; } - TSWAP(pModif->pDataBlocks, pStmt->pDataBlocks, SArray*); + TSWAP(pModif->pDataBlocks, pStmt->pDataBlocks); pModif->msgType = getMsgType(pStmt->sqlNodeType); *pLogicNode = (SLogicNode*)pModif; return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index f1b16353b6..042b914927 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -246,7 +246,7 @@ static int32_t cpdMergeConds(SNode** pDst, SNodeList** pSrc) { static int32_t cpdCondAppend(SNode** pCond, SNode** pAdditionalCond) { if (NULL == *pCond) { - TSWAP(*pCond, *pAdditionalCond, SNode*); + TSWAP(*pCond, *pAdditionalCond); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 707e6aac14..548957ab8e 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1097,7 +1097,7 @@ static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlock pInserter->numOfTables = pBlocks->numOfTables; pInserter->size = pBlocks->size; - TSWAP(pInserter->pData, pBlocks->pData, char*); + TSWAP(pInserter->pData, pBlocks->pData); *pSink = (SDataSinkNode*)pInserter; return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 51bd36f9f9..1266e8ae4b 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -65,7 +65,7 @@ static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pSubplan->id.groupId = pCxt->groupId; pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan); - TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*); + TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList); SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag); return pSubplan; } @@ -406,8 +406,7 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan } if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIF == nodeType(pLogicNode)) { pSubplan->subplanType = SUBPLAN_TYPE_MODIFY; - TSWAP(((SVnodeModifLogicNode*)pLogicNode)->pDataBlocks, ((SVnodeModifLogicNode*)pSubplan->pNode)->pDataBlocks, - SArray*); + TSWAP(((SVnodeModifLogicNode*)pLogicNode)->pDataBlocks, ((SVnodeModifLogicNode*)pSubplan->pNode)->pDataBlocks); } else { pSubplan->subplanType = SUBPLAN_TYPE_SCAN; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 28514c3605..94b84c5861 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -133,7 +133,12 @@ static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SS colDataAppendNULL(pOutputData, i); continue; } - out[i] = valFn(getValueFn(pInputData->pData, i)); + double result = valFn(getValueFn(pInputData->pData, i)); + if (isinf(result) || isnan(result)) { + colDataAppendNULL(pOutputData, i); + } else { + out[i] = result; + } } pOutput->numOfRows = pInput->numOfRows; @@ -162,7 +167,12 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S colDataAppendNULL(pOutputData, i); continue; } - out[i] = valFn(getValueFn[0](pInputData[0]->pData, i), getValueFn[1](pInputData[1]->pData, 0)); + double result = valFn(getValueFn[0](pInputData[0]->pData, i), getValueFn[1](pInputData[1]->pData, 0)); + if (isinf(result) || isnan(result)) { + colDataAppendNULL(pOutputData, i); + } else { + out[i] = result; + } } pOutput->numOfRows = pInput->numOfRows; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c0da3f9c1f..fecb5d9279 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -49,6 +49,10 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + if (pRpc->tcphandle == NULL) { + taosMemoryFree(pRpc); + return NULL; + } pRpc->parent = pInit->parent; if (pInit->user) { memcpy(pRpc->user, pInit->user, strlen(pInit->user)); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dcee65ed21..20406763de 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -912,7 +912,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { cliDestroy((uv_handle_t*)pConn->stream); return -1; } - } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) { if (pResp->contLen == 0) { pEpSet->inUse = (pEpSet->inUse++) % pEpSet->numOfEps; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c02cb07101..b78edf6cd0 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -93,6 +93,8 @@ typedef struct SServerObj { uint32_t ip; uint32_t port; uv_async_t* pAcceptAsync; // just to quit from from accept thread + + bool inited; } SServerObj; // handle @@ -143,7 +145,7 @@ static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleR static int32_t exHandlesMgt; -void uvInitExHandleMgt(); +void uvInitEnv(); void uvOpenExHandleMgt(int size); void uvCloseExHandleMgt(); int64_t uvAddExHandle(void* p); @@ -716,6 +718,7 @@ static bool addHandleToAcceptloop(void* arg) { } if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) { tError("failed to listen: %s", uv_err_name(err)); + terrno = TSDB_CODE_RPC_PORT_EADDRINUSE; return false; } return true; @@ -800,7 +803,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->port = port; uv_loop_init(srv->loop); - taosThreadOnce(&transModuleInit, uvInitExHandleMgt); + taosThreadOnce(&transModuleInit, uvInitEnv); transSrvInst++; for (int i = 0; i < srv->numOfThreads; i++) { @@ -844,15 +847,15 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; // clear all resource later } - + srv->inited = true; return srv; End: transCloseServer(srv); return NULL; } -void uvInitExHandleMgt() { - // init exhandle mgt +void uvInitEnv() { + uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); uvOpenExHandleMgt(10000); } void uvOpenExHandleMgt(int size) { @@ -958,9 +961,10 @@ void transCloseServer(void* arg) { SServerObj* srv = arg; tDebug("send quit msg to accept thread"); - uv_async_send(srv->pAcceptAsync); - taosThreadJoin(srv->thread, NULL); - + if (srv->inited) { + uv_async_send(srv->pAcceptAsync); + taosThreadJoin(srv->thread, NULL); + } SRV_RELEASE_UV(srv->loop); for (int i = 0; i < srv->numOfThreads; i++) { diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 9475625413..7df4c26afd 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -67,6 +67,12 @@ int32_t tsem_wait(tsem_t* sem) { return ret; } +int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { + int ret = 0; + + return ret; +} + #elif defined(_TD_DARWIN_64) /* diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4926f4df87..8f2ba2dcc4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -68,6 +68,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "port already in use") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory") diff --git a/source/util/test/encodeTest.cpp b/source/util/test/encodeTest.cpp index 9ddbd24353..95ea6b1674 100644 --- a/source/util/test/encodeTest.cpp +++ b/source/util/test/encodeTest.cpp @@ -230,7 +230,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) { const char *tstr; uint64_t len; if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1; - pSAV1->A_c = (char *)TCODER_MALLOC(pCoder, len + 1); + pSAV1->A_c = (char *)tCoderMalloc(pCoder, len + 1); memcpy(pSAV1->A_c, tstr, len + 1); tEndDecode(pCoder); @@ -269,7 +269,7 @@ static int32_t tSStructA_v2_decode(SCoder *pCoder, SStructA_v2 *pSAV2) { const char *tstr; uint64_t len; if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1; - pSAV2->A_c = (char *)TCODER_MALLOC(pCoder, len + 1); + pSAV2->A_c = (char *)tCoderMalloc(pCoder, len + 1); memcpy(pSAV2->A_c, tstr, len + 1); // ------------------------NEW FIELDS DECODE------------------------------- @@ -305,7 +305,7 @@ static int32_t tSFinalReq_v1_encode(SCoder *pCoder, const SFinalReq_v1 *ps1) { static int32_t tSFinalReq_v1_decode(SCoder *pCoder, SFinalReq_v1 *ps1) { if (tStartDecode(pCoder) < 0) return -1; - ps1->pA = (SStructA_v1 *)TCODER_MALLOC(pCoder, sizeof(*(ps1->pA))); + ps1->pA = (SStructA_v1 *)tCoderMalloc(pCoder, sizeof(*(ps1->pA))); if (tSStructA_v1_decode(pCoder, ps1->pA) < 0) return -1; if (tDecodeI32(pCoder, &ps1->v_a) < 0) return -1; if (tDecodeI8(pCoder, &ps1->v_b) < 0) return -1; @@ -339,7 +339,7 @@ static int32_t tSFinalReq_v2_encode(SCoder *pCoder, const SFinalReq_v2 *ps2) { static int32_t tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) { if (tStartDecode(pCoder) < 0) return -1; - ps2->pA = (SStructA_v2 *)TCODER_MALLOC(pCoder, sizeof(*(ps2->pA))); + ps2->pA = (SStructA_v2 *)tCoderMalloc(pCoder, sizeof(*(ps2->pA))); if (tSStructA_v2_decode(pCoder, ps2->pA) < 0) return -1; if (tDecodeI32(pCoder, &ps2->v_a) < 0) return -1; if (tDecodeI8(pCoder, &ps2->v_b) < 0) return -1; diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py index 3c3508a5c3..fa94dea656 100644 --- a/tests/system-test/0-others/taosShell.py +++ b/tests/system-test/0-others/taosShell.py @@ -70,11 +70,12 @@ class TDTestCase: # 'serverPort': 7080, 'firstEp': 'trd02:7080'} hostname = socket.gethostname() serverPort = '7080' - clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':''} - clientCfgDict["serverPort"] = serverPort - clientCfgDict["firstEp"] = hostname + ':' + serverPort - clientCfgDict["secondEp"] = hostname + ':' + serverPort - + rpcDebugFlagVal = '143' + clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135'} + clientCfgDict["serverPort"] = serverPort + clientCfgDict["firstEp"] = hostname + ':' + serverPort + clientCfgDict["secondEp"] = hostname + ':' + serverPort + clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':''} updatecfgDict["clientCfg"] = clientCfgDict @@ -109,8 +110,8 @@ class TDTestCase: # time.sleep(2) tdSql.query("create user testpy pass 'testpy'") - hostname = socket.gethostname() - tdLog.info ("hostname: %s" % hostname) + #hostname = socket.gethostname() + #tdLog.info ("hostname: %s" % hostname) buildPath = self.getBuildPath() if (buildPath == ""): @@ -126,8 +127,9 @@ class TDTestCase: keyDict = {'h':'', 'P':'6030', 'p':'testpy', 'u':'testpy', 'a':'', 'A':'', 'c':'', 'C':'', 's':'', 'r':'', 'f':'', \ 'k':'', 't':'', 'n':'', 'l':'1024', 'N':'100', 'V':'', 'd':'db', 'w':'30', '-help':'', '-usage':'', '?':''} - keyDict['h'] = hostname + keyDict['h'] = self.hostname keyDict['c'] = cfgPath + keyDict['P'] = self.serverPort tdLog.printNoPrefix("================================ parameter: -h") newDbName="dbh" @@ -312,9 +314,25 @@ class TDTestCase: if retCode != "TAOS_OK": tdLog.exit("taos -C fail") - print ("-C return content:\n ", retVal) - + #print ("-C return content:\n ", retVal) + totalCfgItem = {"firstEp":['', '', ''], } + for line in retVal.splitlines(): + strList = line.split() + if (len(strList) > 2): + totalCfgItem[strList[1]] = strList + + #print ("dict content:\n ", totalCfgItem) + firstEp = keyDict["h"] + ':' + keyDict['P'] + if (totalCfgItem["firstEp"][2] != firstEp) and (totalCfgItem["firstEp"][0] != 'cfg_file'): + tdLog.exit("taos -C return firstEp error!") + + if (totalCfgItem["rpcDebugFlag"][2] != self.rpcDebugFlagVal) and (totalCfgItem["rpcDebugFlag"][0] != 'cfg_file'): + tdLog.exit("taos -C return rpcDebugFlag error!") + + count = os.cpu_count() + if (totalCfgItem["numOfCores"][2] != count) and (totalCfgItem["numOfCores"][0] != 'default'): + tdLog.exit("taos -C return numOfCores error!") def stop(self): tdSql.close() diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 30477722ab..9a4f780ab2 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -1,6 +1,9 @@ #!/bin/bash set -e +#python3 ./test.py -f 0-others/taosShell.py + + #python3 ./test.py -f 2-query/between.py #python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/varchar.py diff --git a/tools/shell/inc/shellInt.h b/tools/shell/inc/shellInt.h index d218bbf373..382009905c 100644 --- a/tools/shell/inc/shellInt.h +++ b/tools/shell/inc/shellInt.h @@ -111,6 +111,5 @@ void shellTestNetWork(); // shellMain.c extern SShellObj shell; -extern void taos_init(); #endif /*_TD_SHELL_INT_H_*/