Merge branch '3.0' into feature/tq
This commit is contained in:
commit
c3b3779713
|
@ -83,11 +83,6 @@ if(${BUILD_WITH_NURAFT})
|
||||||
cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||||
endif(${BUILD_WITH_NURAFT})
|
endif(${BUILD_WITH_NURAFT})
|
||||||
|
|
||||||
# iconv
|
|
||||||
if(${BUILD_WITH_ICONV})
|
|
||||||
cat("${CMAKE_SUPPORT_DIR}/iconv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
|
||||||
endif(${BUILD_WITH_ICONV})
|
|
||||||
|
|
||||||
# download dependencies
|
# download dependencies
|
||||||
configure_file(${CONTRIB_TMP_FILE} "${CMAKE_CONTRIB_DIR}/deps-download/CMakeLists.txt")
|
configure_file(${CONTRIB_TMP_FILE} "${CMAKE_CONTRIB_DIR}/deps-download/CMakeLists.txt")
|
||||||
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
|
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
|
||||||
|
@ -213,10 +208,9 @@ endif(${BUILD_WITH_TRAFT})
|
||||||
|
|
||||||
# LIBUV
|
# LIBUV
|
||||||
if(${BUILD_WITH_UV})
|
if(${BUILD_WITH_UV})
|
||||||
if (${TD_WINDOWS})
|
if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows")
|
||||||
file(READ "libuv/include/uv.h" CONTENTS)
|
MESSAGE("Windows need set no-sign-compare")
|
||||||
string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
|
add_compile_options(-Wno-sign-compare)
|
||||||
file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}")
|
|
||||||
endif ()
|
endif ()
|
||||||
add_subdirectory(libuv)
|
add_subdirectory(libuv)
|
||||||
endif(${BUILD_WITH_UV})
|
endif(${BUILD_WITH_UV})
|
||||||
|
@ -249,15 +243,7 @@ if(${BUILD_WITH_SQLITE})
|
||||||
endif(${BUILD_WITH_SQLITE})
|
endif(${BUILD_WITH_SQLITE})
|
||||||
|
|
||||||
# pthread
|
# pthread
|
||||||
if(${BUILD_PTHREAD})
|
|
||||||
add_definitions(-DPTW32_STATIC_LIB)
|
|
||||||
add_subdirectory(pthread)
|
|
||||||
endif(${BUILD_PTHREAD})
|
|
||||||
|
|
||||||
# iconv
|
|
||||||
if(${BUILD_WITH_ICONV})
|
|
||||||
add_subdirectory(iconv)
|
|
||||||
endif(${BUILD_WITH_ICONV})
|
|
||||||
|
|
||||||
# ================================================================================================
|
# ================================================================================================
|
||||||
# Build test
|
# Build test
|
||||||
|
|
|
@ -120,7 +120,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
|
||||||
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
|
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
|
||||||
|
|
||||||
size_t blockDataGetSize(const SSDataBlock* pBlock);
|
size_t blockDataGetSize(const SSDataBlock* pBlock);
|
||||||
size_t blockDataGetRowSize(const SSDataBlock* pBlock);
|
size_t blockDataGetRowSize(SSDataBlock* pBlock);
|
||||||
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
|
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
|
||||||
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
|
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
@ -131,7 +131,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
|
||||||
|
|
||||||
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
|
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||||
void blockDataClearup(SSDataBlock* pDataBlock);
|
void blockDataCleanup(SSDataBlock* pDataBlock);
|
||||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
||||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
||||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
|
|
|
@ -1362,13 +1362,27 @@ typedef struct {
|
||||||
int64_t tuid;
|
int64_t tuid;
|
||||||
} SDDropTopicReq;
|
} SDDropTopicReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
float xFilesFactor;
|
||||||
|
int8_t delayUnit;
|
||||||
|
int8_t nFuncIds;
|
||||||
|
int32_t* pFuncIds;
|
||||||
|
int64_t delay;
|
||||||
|
} SRSmaParam;
|
||||||
|
|
||||||
typedef struct SVCreateTbReq {
|
typedef struct SVCreateTbReq {
|
||||||
int64_t ver; // use a general definition
|
int64_t ver; // use a general definition
|
||||||
char* dbFName;
|
char* dbFName;
|
||||||
char* name;
|
char* name;
|
||||||
uint32_t ttl;
|
uint32_t ttl;
|
||||||
uint32_t keep;
|
uint32_t keep;
|
||||||
uint8_t type;
|
union {
|
||||||
|
uint8_t info;
|
||||||
|
struct {
|
||||||
|
uint8_t rollup : 1; // 1 means rollup sma
|
||||||
|
uint8_t type : 7;
|
||||||
|
};
|
||||||
|
};
|
||||||
union {
|
union {
|
||||||
struct {
|
struct {
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
|
@ -1376,6 +1390,9 @@ typedef struct SVCreateTbReq {
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
uint32_t nTagCols;
|
uint32_t nTagCols;
|
||||||
SSchema* pTagSchema;
|
SSchema* pTagSchema;
|
||||||
|
col_id_t nBSmaCols;
|
||||||
|
col_id_t* pBSmaCols;
|
||||||
|
SRSmaParam* pRSmaParam;
|
||||||
} stbCfg;
|
} stbCfg;
|
||||||
struct {
|
struct {
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
|
@ -1384,6 +1401,9 @@ typedef struct SVCreateTbReq {
|
||||||
struct {
|
struct {
|
||||||
uint32_t nCols;
|
uint32_t nCols;
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
|
col_id_t nBSmaCols;
|
||||||
|
col_id_t* pBSmaCols;
|
||||||
|
SRSmaParam* pRSmaParam;
|
||||||
} ntbCfg;
|
} ntbCfg;
|
||||||
};
|
};
|
||||||
} SVCreateTbReq, SVUpdateTbReq;
|
} SVCreateTbReq, SVUpdateTbReq;
|
||||||
|
|
|
@ -968,10 +968,14 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in
|
||||||
#endif
|
#endif
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
if (tdGetBitmapValType(pCol->pBitmap, row, &(pVal->valType)) < 0) {
|
|
||||||
|
if (TD_COL_ROWS_NORM(pCol)) {
|
||||||
|
pVal->valType = TD_VTYPE_NORM;
|
||||||
|
} else if (tdGetBitmapValType(pCol->pBitmap, row, &(pVal->valType)) < 0) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
if (TD_COL_ROWS_NORM(pCol) || tdValTypeIsNorm(pVal->valType)) {
|
|
||||||
|
if (tdValTypeIsNorm(pVal->valType)) {
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
pVal->val = POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
|
pVal->val = POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -46,7 +46,7 @@ typedef struct {
|
||||||
#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v))
|
#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v))
|
||||||
#define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
|
#define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
|
||||||
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
|
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
|
||||||
#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
|
#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR))
|
||||||
|
|
||||||
#define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0]))
|
#define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0]))
|
||||||
#define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v))
|
#define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v))
|
||||||
|
|
|
@ -141,6 +141,14 @@ typedef struct STscObj {
|
||||||
SAppInstInfo* pAppInfo;
|
SAppInstInfo* pAppInfo;
|
||||||
} STscObj;
|
} STscObj;
|
||||||
|
|
||||||
|
typedef struct SResultColumn {
|
||||||
|
union {
|
||||||
|
char* nullbitmap; // bitmap, one bit for each item in the list
|
||||||
|
int32_t* offset;
|
||||||
|
};
|
||||||
|
char* pData;
|
||||||
|
} SResultColumn;
|
||||||
|
|
||||||
typedef struct SReqResultInfo {
|
typedef struct SReqResultInfo {
|
||||||
const char* pRspMsg;
|
const char* pRspMsg;
|
||||||
const char* pData;
|
const char* pData;
|
||||||
|
@ -148,11 +156,12 @@ typedef struct SReqResultInfo {
|
||||||
uint32_t numOfCols;
|
uint32_t numOfCols;
|
||||||
int32_t* length;
|
int32_t* length;
|
||||||
TAOS_ROW row;
|
TAOS_ROW row;
|
||||||
char** pCol;
|
SResultColumn* pCol;
|
||||||
uint32_t numOfRows;
|
uint32_t numOfRows;
|
||||||
uint64_t totalRows;
|
uint64_t totalRows;
|
||||||
uint32_t current;
|
uint32_t current;
|
||||||
bool completed;
|
bool completed;
|
||||||
|
int32_t payloadLen;
|
||||||
} SReqResultInfo;
|
} SReqResultInfo;
|
||||||
|
|
||||||
typedef struct SShowReqInfo {
|
typedef struct SShowReqInfo {
|
||||||
|
@ -227,7 +236,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
|
||||||
|
|
||||||
void* doFetchRow(SRequestObj* pRequest);
|
void* doFetchRow(SRequestObj* pRequest);
|
||||||
|
|
||||||
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
||||||
|
|
||||||
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
|
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
|
||||||
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
|
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
|
||||||
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
|
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
|
||||||
static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
|
static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
|
||||||
|
|
||||||
static bool stringLengthCheck(const char* str, size_t maxsize) {
|
static bool stringLengthCheck(const char* str, size_t maxsize) {
|
||||||
if (str == NULL) {
|
if (str == NULL) {
|
||||||
|
@ -556,13 +556,16 @@ void* doFetchRow(SRequestObj* pRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||||
int32_t code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
|
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pRequest->code = code;
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
|
||||||
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
|
|
||||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
||||||
|
|
||||||
|
@ -629,10 +632,23 @@ void* doFetchRow(SRequestObj* pRequest) {
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
|
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
|
||||||
pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
|
SResultColumn* pCol = &pResultInfo->pCol[i];
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||||
pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
|
if (pCol->offset[pResultInfo->current] != -1) {
|
||||||
pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
|
char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
|
||||||
|
|
||||||
|
pResultInfo->length[i] = varDataLen(pStart);
|
||||||
|
pResultInfo->row[i] = varDataVal(pStart);
|
||||||
|
} else {
|
||||||
|
pResultInfo->row[i] = NULL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
|
||||||
|
pResultInfo->row[i] = pResultInfo->pCol[i].pData + pResultInfo->fields[i].bytes * pResultInfo->current;
|
||||||
|
} else {
|
||||||
|
pResultInfo->row[i] = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -640,30 +656,52 @@ _return:
|
||||||
return pResultInfo->row;
|
return pResultInfo->row;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
|
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
|
||||||
if (pResInfo->row == NULL) {
|
if (pResInfo->row == NULL) {
|
||||||
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
|
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
|
||||||
pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
|
pResInfo->pCol = calloc(pResInfo->numOfCols, sizeof(SResultColumn));
|
||||||
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
|
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
|
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
|
||||||
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
|
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
|
||||||
if (numOfRows == 0) {
|
if (numOfRows == 0) {
|
||||||
return;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo check for the failure of malloc
|
int32_t code = doPrepareResPtr(pResultInfo);
|
||||||
doPrepareResPtr(pResultInfo);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t offset = 0;
|
int32_t* colLength = (int32_t*)pResultInfo->pData;
|
||||||
|
char* pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols;
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
pResultInfo->length[i] = pResultInfo->fields[i].bytes;
|
colLength[i] = htonl(colLength[i]);
|
||||||
pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows);
|
|
||||||
pResultInfo->pCol[i] = pResultInfo->row[i];
|
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||||
offset += pResultInfo->fields[i].bytes;
|
pResultInfo->pCol[i].offset = (int32_t*)pStart;
|
||||||
|
pStart += numOfRows * sizeof(int32_t);
|
||||||
|
} else {
|
||||||
|
pResultInfo->pCol[i].nullbitmap = pStart;
|
||||||
|
pStart += BitmapLen(pResultInfo->numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pResultInfo->pCol[i].pData = pStart;
|
||||||
|
pResultInfo->length[i] = pResultInfo->fields[i].bytes;
|
||||||
|
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
|
||||||
|
|
||||||
|
pStart += colLength[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* getDbOfConnection(STscObj* pObj) {
|
char* getDbOfConnection(STscObj* pObj) {
|
||||||
|
@ -685,7 +723,7 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
|
||||||
taosThreadMutexUnlock(&pTscObj->mutex);
|
taosThreadMutexUnlock(&pTscObj->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
|
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
|
||||||
assert(pResultInfo != NULL && pRsp != NULL);
|
assert(pResultInfo != NULL && pRsp != NULL);
|
||||||
|
|
||||||
pResultInfo->pRspMsg = (const char*)pRsp;
|
pResultInfo->pRspMsg = (const char*)pRsp;
|
||||||
|
@ -693,7 +731,9 @@ void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp*
|
||||||
pResultInfo->numOfRows = htonl(pRsp->numOfRows);
|
pResultInfo->numOfRows = htonl(pRsp->numOfRows);
|
||||||
pResultInfo->current = 0;
|
pResultInfo->current = 0;
|
||||||
pResultInfo->completed = (pRsp->completed == 1);
|
pResultInfo->completed = (pRsp->completed == 1);
|
||||||
|
pResultInfo->payloadLen = htonl(pRsp->compLen);
|
||||||
|
|
||||||
|
// TODO handle the compressed case
|
||||||
pResultInfo->totalRows += pResultInfo->numOfRows;
|
pResultInfo->totalRows += pResultInfo->numOfRows;
|
||||||
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
|
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,17 +285,17 @@ TEST(testCase, create_stable_Test) {
|
||||||
ASSERT_EQ(numOfFields, 0);
|
ASSERT_EQ(numOfFields, 0);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)");
|
// pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)");
|
||||||
if (taos_errno(pRes) != 0) {
|
// if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
|
// printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
pRes = taos_query(pConn, "use abc1");
|
// pRes = taos_query(pConn, "use abc1");
|
||||||
taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
pRes = taos_query(pConn, "drop stable `123_$^)`");
|
// pRes = taos_query(pConn, "drop stable `123_$^)`");
|
||||||
if (taos_errno(pRes) != 0) {
|
// if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes));
|
// printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
// }
|
||||||
|
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
@ -335,7 +335,7 @@ TEST(testCase, create_ctable_Test) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table tu using sts tags('2021-10-10 1:1:1');");
|
pRes = taos_query(pConn, "create table tu using st1 tags('2021-10-10 1:1:1');");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
}
|
||||||
|
@ -485,7 +485,9 @@ TEST(testCase, show_table_Test) {
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "show abc1.tables");
|
taos_query(pConn, "use abc1");
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "show tables");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
|
printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
@ -658,13 +660,7 @@ TEST(testCase, agg_query_tables) {
|
||||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)");
|
pRes = taos_query(pConn, "select count(*), sum(k),min(k),max(k) from tu");
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("failed to create table, reason:%s\n", taos_errstr(pRes));
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "select count(*) from t_x_19");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
|
@ -497,11 +497,11 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* +------------------+---------------+--------------------+
|
* +------------------+---------------------------------------------+
|
||||||
* |the number of rows| column length | column #1 |
|
* |the number of rows| column #1 |
|
||||||
* | (4 bytes) | (4 bytes) |--------------------+
|
* | (4 bytes) |------------+-----------------------+--------+
|
||||||
* | | | null bitmap| values|
|
* | | null bitmap| column length(4bytes) | values |
|
||||||
* +------------------+---------------+--------------------+
|
* +------------------+------------+-----------------------+--------+
|
||||||
* @param buf
|
* @param buf
|
||||||
* @param pBlock
|
* @param pBlock
|
||||||
* @return
|
* @return
|
||||||
|
@ -582,8 +582,9 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t blockDataGetRowSize(const SSDataBlock* pBlock) {
|
size_t blockDataGetRowSize(SSDataBlock* pBlock) {
|
||||||
ASSERT(pBlock != NULL);
|
ASSERT(pBlock != NULL);
|
||||||
|
if (pBlock->info.rowSize == 0) {
|
||||||
size_t rowSize = 0;
|
size_t rowSize = 0;
|
||||||
|
|
||||||
size_t numOfCols = pBlock->info.numOfCols;
|
size_t numOfCols = pBlock->info.numOfCols;
|
||||||
|
@ -592,7 +593,10 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock) {
|
||||||
rowSize += pColInfo->info.bytes;
|
rowSize += pColInfo->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
return rowSize;
|
pBlock->info.rowSize = rowSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pBlock->info.rowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -633,7 +637,7 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
||||||
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||||
rowSize += sizeof(int32_t);
|
rowSize += sizeof(int32_t);
|
||||||
} else {
|
} else {
|
||||||
rowSize += 1 / 8.0;
|
rowSize += 1/8.0; // one bit for each record
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1139,7 +1143,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void blockDataClearup(SSDataBlock* pDataBlock) {
|
void blockDataCleanup(SSDataBlock* pDataBlock) {
|
||||||
pDataBlock->info.rows = 0;
|
pDataBlock->info.rows = 0;
|
||||||
|
|
||||||
if (pDataBlock->info.hasVarCol) {
|
if (pDataBlock->info.hasVarCol) {
|
||||||
|
|
|
@ -290,7 +290,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
tlen += taosEncodeString(buf, pReq->name);
|
tlen += taosEncodeString(buf, pReq->name);
|
||||||
tlen += taosEncodeFixedU32(buf, pReq->ttl);
|
tlen += taosEncodeFixedU32(buf, pReq->ttl);
|
||||||
tlen += taosEncodeFixedU32(buf, pReq->keep);
|
tlen += taosEncodeFixedU32(buf, pReq->keep);
|
||||||
tlen += taosEncodeFixedU8(buf, pReq->type);
|
tlen += taosEncodeFixedU8(buf, pReq->info);
|
||||||
|
|
||||||
switch (pReq->type) {
|
switch (pReq->type) {
|
||||||
case TD_SUPER_TABLE:
|
case TD_SUPER_TABLE:
|
||||||
|
@ -309,6 +309,20 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
|
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
|
||||||
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
|
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
|
||||||
}
|
}
|
||||||
|
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
|
||||||
|
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
|
||||||
|
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
|
||||||
|
}
|
||||||
|
if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) {
|
||||||
|
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||||
|
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
||||||
|
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
||||||
|
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
|
||||||
|
for(int8_t i=0; i< param->nFuncIds; ++i) {
|
||||||
|
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
|
||||||
|
}
|
||||||
|
tlen += taosEncodeFixedI64(buf, param->delay);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case TD_CHILD_TABLE:
|
case TD_CHILD_TABLE:
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid);
|
tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid);
|
||||||
|
@ -322,6 +336,20 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
|
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
|
||||||
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
|
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
|
||||||
}
|
}
|
||||||
|
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
|
||||||
|
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
|
||||||
|
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
|
||||||
|
}
|
||||||
|
if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) {
|
||||||
|
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||||
|
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
||||||
|
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
||||||
|
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
|
||||||
|
for(int8_t i=0; i< param->nFuncIds; ++i) {
|
||||||
|
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
|
||||||
|
}
|
||||||
|
tlen += taosEncodeFixedI64(buf, param->delay);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -335,7 +363,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
buf = taosDecodeString(buf, &(pReq->name));
|
buf = taosDecodeString(buf, &(pReq->name));
|
||||||
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
|
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
|
||||||
buf = taosDecodeFixedU32(buf, &(pReq->keep));
|
buf = taosDecodeFixedU32(buf, &(pReq->keep));
|
||||||
buf = taosDecodeFixedU8(buf, &(pReq->type));
|
buf = taosDecodeFixedU8(buf, &(pReq->info));
|
||||||
|
|
||||||
switch (pReq->type) {
|
switch (pReq->type) {
|
||||||
case TD_SUPER_TABLE:
|
case TD_SUPER_TABLE:
|
||||||
|
@ -356,6 +384,32 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
|
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
|
||||||
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
|
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
|
||||||
}
|
}
|
||||||
|
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
|
||||||
|
if(pReq->stbCfg.nBSmaCols > 0) {
|
||||||
|
pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t));
|
||||||
|
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
|
||||||
|
buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pReq->stbCfg.pBSmaCols = NULL;
|
||||||
|
}
|
||||||
|
if(pReq->rollup) {
|
||||||
|
pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
|
||||||
|
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||||
|
buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor);
|
||||||
|
buf = taosDecodeFixedI8(buf, ¶m->delayUnit);
|
||||||
|
buf = taosDecodeFixedI8(buf, ¶m->nFuncIds);
|
||||||
|
if(param->nFuncIds > 0) {
|
||||||
|
for (int8_t i = 0; i< param->nFuncIds; ++i) {
|
||||||
|
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
param->pFuncIds = NULL;
|
||||||
|
}
|
||||||
|
buf = taosDecodeFixedI64(buf, ¶m->delay);
|
||||||
|
} else {
|
||||||
|
pReq->stbCfg.pRSmaParam = NULL;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case TD_CHILD_TABLE:
|
case TD_CHILD_TABLE:
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->ctbCfg.suid);
|
buf = taosDecodeFixedI64(buf, &pReq->ctbCfg.suid);
|
||||||
|
@ -370,6 +424,32 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
|
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
|
||||||
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
|
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
|
||||||
}
|
}
|
||||||
|
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
|
||||||
|
if(pReq->stbCfg.nBSmaCols > 0) {
|
||||||
|
pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t));
|
||||||
|
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
|
||||||
|
buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pReq->stbCfg.pBSmaCols = NULL;
|
||||||
|
}
|
||||||
|
if(pReq->rollup) {
|
||||||
|
pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
|
||||||
|
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||||
|
buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor);
|
||||||
|
buf = taosDecodeFixedI8(buf, ¶m->delayUnit);
|
||||||
|
buf = taosDecodeFixedI8(buf, ¶m->nFuncIds);
|
||||||
|
if(param->nFuncIds > 0) {
|
||||||
|
for (int8_t i = 0; i< param->nFuncIds; ++i) {
|
||||||
|
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
param->pFuncIds = NULL;
|
||||||
|
}
|
||||||
|
buf = taosDecodeFixedI64(buf, ¶m->delay);
|
||||||
|
} else {
|
||||||
|
pReq->stbCfg.pRSmaParam = NULL;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -343,7 +343,7 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) {
|
||||||
mgmtFp.requiredFp = vmRequire;
|
mgmtFp.requiredFp = vmRequire;
|
||||||
|
|
||||||
vmInitMsgHandles(pWrapper);
|
vmInitMsgHandles(pWrapper);
|
||||||
pWrapper->name = "vnodes";
|
pWrapper->name = "vnode";
|
||||||
pWrapper->fp = mgmtFp;
|
pWrapper->fp = mgmtFp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,13 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndInfoSchema.h"
|
#include "mndInfoSchema.h"
|
||||||
|
|
||||||
|
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
|
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
|
|
||||||
static const SInfosTableSchema dnodesSchema[] = {{.name = "id", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
static const SInfosTableSchema dnodesSchema[] = {{.name = "id", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||||
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
{.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||||
{.name = "max_vnodes",.bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
{.name = "max_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||||
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
@ -38,7 +41,7 @@ static const SInfosTableSchema qnodesSchema[] = {{.name = "id", .byt
|
||||||
{.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
{.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||||
{.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
@ -64,15 +67,15 @@ static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .b
|
||||||
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema userIdxSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
static const SInfosTableSchema userIdxSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "table_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "index_database", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "index_database", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "index_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "index_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "column_name", .bytes = 64, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "column_name", .bytes = 64, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema userStbsSchema[] = {{.name = "db_name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
static const SInfosTableSchema userStbsSchema[] = {{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "stable_name", .bytes = (TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "tags", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "tags", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
@ -80,25 +83,25 @@ static const SInfosTableSchema userStbsSchema[] = {{.name = "db_name", .
|
||||||
{.name = "last_update", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "last_update", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_INT},
|
{.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_INT},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema userStreamsSchema[] = {{.name = "stream_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY},
|
static const SInfosTableSchema userStreamsSchema[] = {{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "dest_table", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema userTblsSchema[] = {
|
static const SInfosTableSchema userTblsSchema[] = {
|
||||||
{.name = "table_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "stable_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "uid", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "uid", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "table_comment", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "table_comment", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema userTblDistSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
static const SInfosTableSchema userTblDistSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "table_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "distributed_histogram", .bytes = 500, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "distributed_histogram", .bytes = 500, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
@ -117,7 +120,7 @@ static const SInfosTableSchema userUsersSchema[] = {{.name = "name", .
|
||||||
{.name = "account", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "account", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema vgroupsSchema[] = {{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
static const SInfosTableSchema vgroupsSchema[] = {{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "db_name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
|
|
@ -1534,20 +1534,19 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
|
||||||
cols = 0;
|
cols = 0;
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
char db[TSDB_DB_NAME_LEN] = {0};
|
|
||||||
tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB);
|
|
||||||
tNameGetDbName(&name, db);
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
STR_TO_VARSTR(pWrite, db);
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
char stbName[TSDB_TABLE_NAME_LEN] = {0};
|
char stbName[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
mndExtractTableName(pStb->name, stbName);
|
mndExtractTableName(pStb->name, stbName);
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
STR_TO_VARSTR(pWrite, stbName);
|
STR_TO_VARSTR(pWrite, stbName);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
|
char db[TSDB_DB_NAME_LEN] = {0};
|
||||||
|
tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB);
|
||||||
|
tNameGetDbName(&name, db);
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
STR_TO_VARSTR(pWrite, db);
|
||||||
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
*(int64_t *)pWrite = pStb->createdTime;
|
*(int64_t *)pWrite = pStb->createdTime;
|
||||||
cols++;
|
cols++;
|
||||||
|
|
|
@ -24,12 +24,12 @@ extern "C" {
|
||||||
|
|
||||||
extern int32_t tsdbDebugFlag;
|
extern int32_t tsdbDebugFlag;
|
||||||
|
|
||||||
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||||
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
||||||
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
||||||
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
||||||
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -507,7 +507,7 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
|
||||||
tsize += taosEncodeString(buf, pTbCfg->name);
|
tsize += taosEncodeString(buf, pTbCfg->name);
|
||||||
tsize += taosEncodeFixedU32(buf, pTbCfg->ttl);
|
tsize += taosEncodeFixedU32(buf, pTbCfg->ttl);
|
||||||
tsize += taosEncodeFixedU32(buf, pTbCfg->keep);
|
tsize += taosEncodeFixedU32(buf, pTbCfg->keep);
|
||||||
tsize += taosEncodeFixedU8(buf, pTbCfg->type);
|
tsize += taosEncodeFixedU8(buf, pTbCfg->info);
|
||||||
|
|
||||||
if (pTbCfg->type == META_SUPER_TABLE) {
|
if (pTbCfg->type == META_SUPER_TABLE) {
|
||||||
SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema};
|
SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema};
|
||||||
|
@ -527,7 +527,7 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
|
||||||
buf = taosDecodeString(buf, &(pTbCfg->name));
|
buf = taosDecodeString(buf, &(pTbCfg->name));
|
||||||
buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl));
|
buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl));
|
||||||
buf = taosDecodeFixedU32(buf, &(pTbCfg->keep));
|
buf = taosDecodeFixedU32(buf, &(pTbCfg->keep));
|
||||||
buf = taosDecodeFixedU8(buf, &(pTbCfg->type));
|
buf = taosDecodeFixedU8(buf, &(pTbCfg->info));
|
||||||
|
|
||||||
if (pTbCfg->type == META_SUPER_TABLE) {
|
if (pTbCfg->type == META_SUPER_TABLE) {
|
||||||
SSchemaWrapper sw;
|
SSchemaWrapper sw;
|
||||||
|
|
|
@ -727,9 +727,9 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pBlockCol->colId == pDataCol->colId);
|
ASSERT(pBlockCol->colId == pDataCol->colId);
|
||||||
|
}
|
||||||
// set the bitmap
|
// set the bitmap
|
||||||
pDataCol->bitmap = pBlockCol->bitmap;
|
pDataCol->bitmap = pBlockCol->bitmap;
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1;
|
if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,61 @@ int main(int argc, char **argv) {
|
||||||
return RUN_ALL_TESTS();
|
return RUN_ALL_TESTS();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(testCase, unionEncodeDecodeTest) {
|
||||||
|
typedef struct {
|
||||||
|
union {
|
||||||
|
uint8_t info;
|
||||||
|
struct {
|
||||||
|
uint8_t rollup : 1; // 1 means rollup sma
|
||||||
|
uint8_t type : 7;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
col_id_t nBSmaCols;
|
||||||
|
col_id_t* pBSmaCols;
|
||||||
|
} SUnionTest;
|
||||||
|
|
||||||
|
SUnionTest sut = {0};
|
||||||
|
sut.rollup = 1;
|
||||||
|
sut.type = 1;
|
||||||
|
|
||||||
|
sut.nBSmaCols = 2;
|
||||||
|
sut.pBSmaCols = (col_id_t*)malloc(sut.nBSmaCols * sizeof(col_id_t));
|
||||||
|
for (col_id_t i = 0; i < sut.nBSmaCols; ++i) {
|
||||||
|
sut.pBSmaCols[i] = i + 100;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* buf = malloc(1024);
|
||||||
|
void * pBuf = buf;
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeFixedU8(&buf, sut.info);
|
||||||
|
tlen += taosEncodeFixedI16(&buf, sut.nBSmaCols);
|
||||||
|
for (col_id_t i = 0; i < sut.nBSmaCols; ++i) {
|
||||||
|
tlen += taosEncodeFixedI16(&buf, sut.pBSmaCols[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
SUnionTest dut = {0};
|
||||||
|
pBuf = taosDecodeFixedU8(pBuf, &dut.info);
|
||||||
|
pBuf = taosDecodeFixedI16(pBuf, &dut.nBSmaCols);
|
||||||
|
if(dut.nBSmaCols > 0) {
|
||||||
|
dut.pBSmaCols = (col_id_t*)malloc(dut.nBSmaCols * sizeof(col_id_t));
|
||||||
|
for(col_id_t i=0; i < dut.nBSmaCols; ++i) {
|
||||||
|
pBuf = taosDecodeFixedI16(pBuf, dut.pBSmaCols + i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dut.pBSmaCols = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("sut.rollup=%" PRIu8 ", type=%" PRIu8 ", info=%" PRIu8 "\n", sut.rollup, sut.type, sut.info);
|
||||||
|
printf("dut.rollup=%" PRIu8 ", type=%" PRIu8 ", info=%" PRIu8 "\n", dut.rollup, dut.type, dut.info);
|
||||||
|
|
||||||
|
ASSERT_EQ(sut.rollup, dut.rollup);
|
||||||
|
ASSERT_EQ(sut.type, dut.type);
|
||||||
|
ASSERT_EQ(sut.nBSmaCols, dut.nBSmaCols);
|
||||||
|
for (col_id_t i = 0; i< sut.nBSmaCols; ++i) {
|
||||||
|
ASSERT_EQ(*(col_id_t*)(sut.pBSmaCols + i), sut.pBSmaCols[i]);
|
||||||
|
ASSERT_EQ(*(col_id_t*)(sut.pBSmaCols + i), dut.pBSmaCols[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
#if 1
|
#if 1
|
||||||
TEST(testCase, tSma_Meta_Encode_Decode_Test) {
|
TEST(testCase, tSma_Meta_Encode_Decode_Test) {
|
||||||
// encode
|
// encode
|
||||||
|
|
|
@ -65,37 +65,57 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
|
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
|
||||||
int32_t colSize = pColRes->info.bytes * numOfRows;
|
int32_t colSize = colDataGetLength(pColRes, numOfRows);
|
||||||
return (*(tDataTypes[pColRes->info.type].compFunc))(
|
return (*(tDataTypes[pColRes->info.type].compFunc))(
|
||||||
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
|
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t *compLen) {
|
static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t * dataLen) {
|
||||||
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
|
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
|
||||||
int32_t *compSizes = (int32_t*)data;
|
int32_t * colSizes = (int32_t*)data;
|
||||||
if (compressed) {
|
|
||||||
data += numOfCols * sizeof(int32_t);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
data += numOfCols * sizeof(int32_t);
|
||||||
|
*dataLen = (numOfCols * sizeof(int32_t));
|
||||||
|
|
||||||
|
int32_t numOfRows = pInput->pData->info.rows;
|
||||||
for (int32_t col = 0; col < numOfCols; ++col) {
|
for (int32_t col = 0; col < numOfCols; ++col) {
|
||||||
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
|
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
|
||||||
if (compressed) {
|
|
||||||
compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed);
|
// copy the null bitmap
|
||||||
data += compSizes[col];
|
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
||||||
*compLen += compSizes[col];
|
size_t metaSize = numOfRows * sizeof(int32_t);
|
||||||
compSizes[col] = htonl(compSizes[col]);
|
memcpy(data, pColRes->varmeta.offset, metaSize);
|
||||||
|
data += metaSize;
|
||||||
|
(*dataLen) += metaSize;
|
||||||
} else {
|
} else {
|
||||||
for(int32_t i = 0; i < pInput->pData->info.rows; ++i) {
|
int32_t len = BitmapLen(numOfRows);
|
||||||
char* pData = colDataGetData(pColRes, i);
|
memcpy(data, pColRes->nullbitmap, len);
|
||||||
memmove(data, pData, pColRes->info.bytes);
|
data += len;
|
||||||
data += pColRes->info.bytes;
|
(*dataLen) += len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (compressed) {
|
||||||
|
colSizes[col] = compressColData(pColRes, numOfRows, data, compressed);
|
||||||
|
data += colSizes[col];
|
||||||
|
(*dataLen) += colSizes[col];
|
||||||
|
} else {
|
||||||
|
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
||||||
|
(*dataLen) += colSizes[col];
|
||||||
|
memmove(data, pColRes->pData, colSizes[col]);
|
||||||
|
data += colSizes[col];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
colSizes[col] = htonl(colSizes[col]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
|
// data format:
|
||||||
// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
|
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||||
|
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
|
||||||
|
// | | sizeof(int32_t) * numOfCols | actual size | | actual size | |
|
||||||
|
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||||
|
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
||||||
|
// recorded in the first segment, next to the struct header
|
||||||
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
|
||||||
pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema);
|
pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema);
|
||||||
|
@ -104,11 +124,9 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
|
||||||
|
|
||||||
pBuf->useSize = sizeof(SRetrieveTableRsp);
|
pBuf->useSize = sizeof(SRetrieveTableRsp);
|
||||||
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
|
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
|
||||||
if (0 == pEntry->compressed) {
|
|
||||||
pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows;
|
pEntry->dataLen = pEntry->dataLen;
|
||||||
}
|
|
||||||
pBuf->useSize += pEntry->dataLen;
|
pBuf->useSize += pEntry->dataLen;
|
||||||
// todo completed
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||||
|
@ -119,8 +137,11 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// struct size + data payload + length for each column
|
// NOTE: there are four bytes of an integer more than the required buffer space.
|
||||||
pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows + pInput->pData->info.numOfCols * sizeof(int32_t);
|
// struct size + data payload + length for each column + bitmap length
|
||||||
|
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) +
|
||||||
|
ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows);
|
||||||
|
|
||||||
pBuf->pData = malloc(pBuf->allocSize);
|
pBuf->pData = malloc(pBuf->allocSize);
|
||||||
if (pBuf->pData == NULL) {
|
if (pBuf->pData == NULL) {
|
||||||
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
|
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
|
||||||
|
@ -173,6 +194,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
|
||||||
*pLen = 0;
|
*pLen = 0;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataDispatchBuf* pBuf = NULL;
|
SDataDispatchBuf* pBuf = NULL;
|
||||||
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||||
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
||||||
|
|
|
@ -3914,7 +3914,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
|
||||||
static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) {
|
static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) {
|
||||||
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
||||||
|
|
||||||
blockDataClearup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
|
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -4737,7 +4737,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup)
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
blockDataClearup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
while (tqNextDataBlock(pInfo->readerHandle)) {
|
while (tqNextDataBlock(pInfo->readerHandle)) {
|
||||||
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
|
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
|
||||||
|
@ -4852,17 +4852,35 @@ static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo*
|
||||||
int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) {
|
int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) {
|
||||||
blockDataEnsureCapacity(pRes, numOfRows);
|
blockDataEnsureCapacity(pRes, numOfRows);
|
||||||
|
|
||||||
if (pColList == NULL) {
|
if (pColList == NULL) { // data from other sources
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
int32_t* colLen = (int32_t*)pData;
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
char* pStart = pData + sizeof(int32_t) * numOfOutput;
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
colDataAppend(pColInfoData, j, pData, false);
|
colLen[i] = htonl(colLen[i]);
|
||||||
pData += pColInfoData->info.bytes;
|
ASSERT(colLen[i] > 0);
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
pColInfoData->varmeta.length = colLen[i];
|
||||||
|
pColInfoData->varmeta.allocLen = colLen[i];
|
||||||
|
|
||||||
|
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t)*numOfRows);
|
||||||
|
pStart += sizeof(int32_t)*numOfRows;
|
||||||
|
|
||||||
|
pColInfoData->pData = malloc(colLen[i]);
|
||||||
|
} else {
|
||||||
|
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
|
||||||
|
pStart += BitmapLen(numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
||||||
|
pStart += colLen[i];
|
||||||
}
|
}
|
||||||
} else { // extract data acording to pColList
|
} else { // extract data according to pColList
|
||||||
ASSERT(numOfOutput == taosArrayGetSize(pColList));
|
ASSERT(numOfOutput == taosArrayGetSize(pColList));
|
||||||
|
|
||||||
|
// data from mnode
|
||||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
|
||||||
for(int32_t j = 0; j < numOfOutput; ++j) {
|
for(int32_t j = 0; j < numOfOutput; ++j) {
|
||||||
|
@ -5458,9 +5476,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
|
||||||
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle);
|
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataClearup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, 1);
|
int32_t tableNameSlotId = 1;
|
||||||
|
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId);
|
||||||
|
|
||||||
char * name = NULL;
|
char * name = NULL;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
@ -5475,7 +5494,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
|
for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
|
||||||
if (i == 1) {
|
if (i == tableNameSlotId) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5782,7 +5801,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) {
|
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) {
|
||||||
blockDataClearup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
@ -5938,7 +5957,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
|
|
||||||
blockDataClearup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||||
if (pTupleHandle == NULL) {
|
if (pTupleHandle == NULL) {
|
||||||
|
@ -6354,8 +6373,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
|
||||||
SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
|
SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
|
||||||
|
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
|
blockDataCleanup(pRes);
|
||||||
blockDataClearup(pRes);
|
|
||||||
|
|
||||||
if (pProjectInfo->existDataBlock) { // TODO refactor
|
if (pProjectInfo->existDataBlock) { // TODO refactor
|
||||||
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
||||||
|
|
|
@ -182,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
||||||
start = stop + 1;
|
start = stop + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataClearup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
|
||||||
SSDataBlock* pBlock = createOneDataBlock(pDataBlock);
|
SSDataBlock* pBlock = createOneDataBlock(pDataBlock);
|
||||||
int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
|
int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
|
||||||
|
@ -312,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
|
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
|
||||||
blockDataClearup(pHandle->pDataBlock);
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
|
if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
|
||||||
|
@ -478,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
setBufPageDirty(pPage, true);
|
setBufPageDirty(pPage, true);
|
||||||
releaseBufPage(pHandle->pBuf, pPage);
|
releaseBufPage(pHandle->pBuf, pPage);
|
||||||
|
|
||||||
blockDataClearup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
tMergeTreeDestroy(pHandle->pMergeTree);
|
tMergeTreeDestroy(pHandle->pMergeTree);
|
||||||
|
|
|
@ -86,7 +86,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
//
|
//
|
||||||
// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
|
// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
|
||||||
} else {
|
} else {
|
||||||
blockDataClearup(pInfo->pBlock);
|
blockDataCleanup(pInfo->pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = pInfo->pBlock;
|
SSDataBlock* pBlock = pInfo->pBlock;
|
||||||
|
@ -151,7 +151,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
|
||||||
taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
|
taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
|
||||||
} else {
|
} else {
|
||||||
blockDataClearup(pInfo->pBlock);
|
blockDataCleanup(pInfo->pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = pInfo->pBlock;
|
SSDataBlock* pBlock = pInfo->pBlock;
|
||||||
|
|
|
@ -23,7 +23,7 @@ extern "C" {
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
|
|
||||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
void functionFinalizer(SqlFunctionCtx *pCtx);
|
void functionFinalize(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
void countFunction(SqlFunctionCtx *pCtx);
|
void countFunction(SqlFunctionCtx *pCtx);
|
||||||
|
@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
void minFunction(SqlFunctionCtx* pCtx);
|
void minFunction(SqlFunctionCtx* pCtx);
|
||||||
void maxFunction(SqlFunctionCtx *pCtx);
|
void maxFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
|
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
void stddevFunction(SqlFunctionCtx* pCtx);
|
||||||
|
void stddevFinalize(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
void firstFunction(SqlFunctionCtx *pCtx);
|
void firstFunction(SqlFunctionCtx *pCtx);
|
||||||
void lastFunction(SqlFunctionCtx *pCtx);
|
void lastFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
|
@ -29,7 +29,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getCountFuncEnv,
|
.getEnvFunc = getCountFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = countFunction,
|
.processFunc = countFunction,
|
||||||
.finalizeFunc = functionFinalizer
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "sum",
|
.name = "sum",
|
||||||
|
@ -39,7 +39,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getSumFuncEnv,
|
.getEnvFunc = getSumFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = sumFunction,
|
.processFunc = sumFunction,
|
||||||
.finalizeFunc = functionFinalizer
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "min",
|
.name = "min",
|
||||||
|
@ -49,7 +49,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = minFunctionSetup,
|
.initFunc = minFunctionSetup,
|
||||||
.processFunc = minFunction,
|
.processFunc = minFunction,
|
||||||
.finalizeFunc = functionFinalizer
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "max",
|
.name = "max",
|
||||||
|
@ -59,7 +59,77 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = maxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = functionFinalizer
|
.finalizeFunc = functionFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "stddev",
|
||||||
|
.type = FUNCTION_TYPE_STDDEV,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = getStddevFuncEnv,
|
||||||
|
.initFunc = maxFunctionSetup,
|
||||||
|
.processFunc = maxFunction,
|
||||||
|
.finalizeFunc = functionFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "percentile",
|
||||||
|
.type = FUNCTION_TYPE_PERCENTILE,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
|
.initFunc = maxFunctionSetup,
|
||||||
|
.processFunc = maxFunction,
|
||||||
|
.finalizeFunc = functionFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "apercentile",
|
||||||
|
.type = FUNCTION_TYPE_APERCENTILE,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
|
.initFunc = maxFunctionSetup,
|
||||||
|
.processFunc = maxFunction,
|
||||||
|
.finalizeFunc = functionFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "top",
|
||||||
|
.type = FUNCTION_TYPE_TOP,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
|
.initFunc = maxFunctionSetup,
|
||||||
|
.processFunc = maxFunction,
|
||||||
|
.finalizeFunc = functionFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "bottom",
|
||||||
|
.type = FUNCTION_TYPE_BOTTOM,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
|
.initFunc = maxFunctionSetup,
|
||||||
|
.processFunc = maxFunction,
|
||||||
|
.finalizeFunc = functionFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "spread",
|
||||||
|
.type = FUNCTION_TYPE_SPREAD,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
|
.initFunc = maxFunctionSetup,
|
||||||
|
.processFunc = maxFunction,
|
||||||
|
.finalizeFunc = functionFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "last_row",
|
||||||
|
.type = FUNCTION_TYPE_LAST_ROW,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
|
.initFunc = maxFunctionSetup,
|
||||||
|
.processFunc = maxFunction,
|
||||||
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "first",
|
.name = "first",
|
||||||
|
@ -69,7 +139,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = firstFunction,
|
.processFunc = firstFunction,
|
||||||
.finalizeFunc = functionFinalizer
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "last",
|
.name = "last",
|
||||||
|
@ -79,18 +149,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = lastFunction,
|
.processFunc = lastFunction,
|
||||||
.finalizeFunc = functionFinalizer
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
// {
|
|
||||||
// .name = "valueAssigner",
|
|
||||||
// .type = FUNCTION_TYPE_ASSIGNER,
|
|
||||||
// .classification = FUNC_MGT_AGG_FUNC,
|
|
||||||
// .checkFunc = stubCheckAndGetResultType,
|
|
||||||
// .getEnvFunc = getFirstLastFuncEnv,
|
|
||||||
// .initFunc = functionSetup,
|
|
||||||
// .processFunc = valFunction,
|
|
||||||
// .finalizeFunc = functionFinalizer
|
|
||||||
// },
|
|
||||||
{
|
{
|
||||||
.name = "concat",
|
.name = "concat",
|
||||||
.type = FUNCTION_TYPE_CONCAT,
|
.type = FUNCTION_TYPE_CONCAT,
|
||||||
|
|
|
@ -50,7 +50,7 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
|
|
||||||
static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); }
|
static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); }
|
||||||
|
|
||||||
void functionFinalizer(SqlFunctionCtx *pCtx) {
|
void functionFinalize(SqlFunctionCtx *pCtx) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
doFinalizer(pResInfo);
|
doFinalizer(pResInfo);
|
||||||
}
|
}
|
||||||
|
@ -441,6 +441,74 @@ void maxFunction(SqlFunctionCtx *pCtx) {
|
||||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct STopBotRes {
|
||||||
|
int32_t num;
|
||||||
|
} STopBotRes;
|
||||||
|
|
||||||
|
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
int32_t bytes = pColNode->node.resType.bytes;
|
||||||
|
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct SStddevRes {
|
||||||
|
int64_t count;
|
||||||
|
union {double quadraticDSum; int64_t quadraticISum;};
|
||||||
|
union {double dsum; int64_t isum;};
|
||||||
|
} SStddevRes;
|
||||||
|
|
||||||
|
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
pEnv->calcMemSize = sizeof(SStddevRes);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
int32_t numOfElem = 0;
|
||||||
|
|
||||||
|
// Only the pre-computing information loaded and actual data does not loaded
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
|
||||||
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
|
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
// } else { // computing based on the true data block
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
int32_t numOfRows = pInput->numOfRows;
|
||||||
|
|
||||||
|
switch(type) {
|
||||||
|
case TSDB_DATA_TYPE_INT: {
|
||||||
|
int32_t* plist = (int32_t*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStddevRes->count += 1;
|
||||||
|
pStddevRes->isum += plist[i];
|
||||||
|
pStddevRes->quadraticISum += plist[i] * plist[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// data in the check operation are all null, not output
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void stddevFinalize(SqlFunctionCtx* pCtx) {
|
||||||
|
functionFinalize(pCtx);
|
||||||
|
|
||||||
|
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
double res = pStddevRes->quadraticISum/pStddevRes->count - (pStddevRes->isum / pStddevRes->count) * (pStddevRes->isum / pStddevRes->count);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
pEnv->calcMemSize = pNode->node.resType.bytes;
|
pEnv->calcMemSize = pNode->node.resType.bytes;
|
||||||
|
|
|
@ -465,7 +465,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
|
||||||
return func(&tmpVal, pSchema->bytes, param);
|
return func(&tmpVal, pSchema->bytes, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
return func(getNullValue(pSchema->type), 0, param);
|
return func(NULL, 0, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (pSchema->type) {
|
switch (pSchema->type) {
|
||||||
|
@ -638,10 +638,14 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p
|
||||||
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||||
}
|
}
|
||||||
varDataSetLen(rowEnd, output);
|
varDataSetLen(rowEnd, output);
|
||||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, true, pa->toffset, pa->colIdx);
|
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
|
||||||
|
} else {
|
||||||
|
if (value == NULL) { // it is a null data
|
||||||
|
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
|
||||||
} else {
|
} else {
|
||||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
|
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -657,9 +657,9 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
|
||||||
|
|
||||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
|
||||||
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
|
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
|
||||||
|
|
||||||
*rspMsg = rsp;
|
*rspMsg = rsp;
|
||||||
*dataLen = 0;
|
*dataLen = 0;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,11 +7,11 @@ add_executable(pushServer "")
|
||||||
|
|
||||||
target_sources(transUT
|
target_sources(transUT
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"transUT.cc"
|
"transUT.cpp"
|
||||||
)
|
)
|
||||||
target_sources(transportTest
|
target_sources(transportTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"transportTests.cc"
|
"transportTests.cpp"
|
||||||
)
|
)
|
||||||
target_sources (client
|
target_sources (client
|
||||||
PRIVATE
|
PRIVATE
|
||||||
|
|
Loading…
Reference in New Issue