Merge branch '3.0' of github.com:taosdata/TDengine into 3.0_udfd
This commit is contained in:
commit
8a858cb48e
|
@ -14,24 +14,9 @@ if(${BUILD_PTHREAD})
|
|||
cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif()
|
||||
|
||||
# iconv
|
||||
if(${BUILD_WITH_ICONV})
|
||||
cat("${TD_SUPPORT_DIR}/iconv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif()
|
||||
|
||||
# msvc regex
|
||||
if(${BUILD_MSVCREGEX})
|
||||
cat("${TD_SUPPORT_DIR}/msvcregex_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif()
|
||||
|
||||
# wcwidth
|
||||
if(${BUILD_WCWIDTH})
|
||||
cat("${TD_SUPPORT_DIR}/wcwidth_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif()
|
||||
|
||||
# wingetopt
|
||||
if(${BUILD_WINGETOPT})
|
||||
cat("${TD_SUPPORT_DIR}/wingetopt_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
# gnu regex
|
||||
if(${BUILD_GNUREGEX})
|
||||
cat("${TD_SUPPORT_DIR}/gnuregex_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif()
|
||||
|
||||
# googletest
|
||||
|
@ -114,27 +99,8 @@ if(${BUILD_TEST})
|
|||
target_include_directories(
|
||||
gtest
|
||||
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src>
|
||||
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_linux>
|
||||
)
|
||||
if(${TD_WINDOWS})
|
||||
target_include_directories(
|
||||
gtest
|
||||
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_win>
|
||||
)
|
||||
endif(${TD_WINDOWS})
|
||||
if(${TD_LINUX})
|
||||
target_include_directories(
|
||||
gtest
|
||||
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_linux>
|
||||
)
|
||||
endif(${TD_LINUX})
|
||||
if(${TD_DARWIN})
|
||||
target_include_directories(
|
||||
gtest
|
||||
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_darwin>
|
||||
)
|
||||
endif(${TD_DARWIN})
|
||||
|
||||
|
||||
endif(${BUILD_TEST})
|
||||
|
||||
# cJson
|
||||
|
@ -216,53 +182,6 @@ if(${BUILD_WITH_NURAFT})
|
|||
add_subdirectory(nuraft)
|
||||
endif(${BUILD_WITH_NURAFT})
|
||||
|
||||
# pthread
|
||||
if(${BUILD_PTHREAD})
|
||||
set(CMAKE_BUILD_TYPE release)
|
||||
add_definitions(-DPTW32_STATIC_LIB)
|
||||
add_subdirectory(pthread)
|
||||
set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread)
|
||||
add_library(pthread STATIC IMPORTED GLOBAL)
|
||||
SET_PROPERTY(TARGET pthread PROPERTY IMPORTED_LOCATION ${LIBRARY_OUTPUT_PATH}/pthread.lib)
|
||||
endif()
|
||||
|
||||
# iconv
|
||||
if(${BUILD_WITH_ICONV})
|
||||
add_subdirectory(iconv)
|
||||
endif(${BUILD_WITH_ICONV})
|
||||
|
||||
# wingetopt
|
||||
if(${BUILD_WINGETOPT})
|
||||
add_subdirectory(wingetopt)
|
||||
endif(${BUILD_WINGETOPT})
|
||||
|
||||
# msvcregex
|
||||
if(${BUILD_MSVCREGEX})
|
||||
add_library(msvcregex STATIC "")
|
||||
target_sources(msvcregex
|
||||
PRIVATE "msvcregex/regex.c"
|
||||
)
|
||||
target_include_directories(msvcregex
|
||||
PRIVATE "msvcregex"
|
||||
)
|
||||
target_link_libraries(msvcregex
|
||||
INTERFACE Shell32
|
||||
)
|
||||
SET_TARGET_PROPERTIES(msvcregex PROPERTIES OUTPUT_NAME msvcregex)
|
||||
endif(${BUILD_MSVCREGEX})
|
||||
|
||||
# msvcregex
|
||||
if(${BUILD_WCWIDTH})
|
||||
add_library(wcwidth STATIC "")
|
||||
target_sources(wcwidth
|
||||
PRIVATE "wcwidth/wcwidth.c"
|
||||
)
|
||||
target_include_directories(wcwidth
|
||||
PRIVATE "wcwidth"
|
||||
)
|
||||
SET_TARGET_PROPERTIES(wcwidth PROPERTIES OUTPUT_NAME wcwidth)
|
||||
endif(${BUILD_WCWIDTH})
|
||||
|
||||
# CRAFT
|
||||
if(${BUILD_WITH_CRAFT})
|
||||
add_library(craft STATIC IMPORTED GLOBAL)
|
||||
|
@ -319,12 +238,8 @@ if(${BUILD_WITH_SQLITE})
|
|||
target_link_libraries(sqlite
|
||||
INTERFACE m
|
||||
INTERFACE pthread
|
||||
INTERFACE dl
|
||||
)
|
||||
if(NOT TD_WINDOWS)
|
||||
target_link_libraries(sqlite
|
||||
INTERFACE dl
|
||||
)
|
||||
endif(NOT TD_WINDOWS)
|
||||
endif(${BUILD_WITH_SQLITE})
|
||||
|
||||
# pthread
|
||||
|
|
|
@ -1,9 +1,2 @@
|
|||
add_executable(simulate_vnode "simulate_vnode.c")
|
||||
target_link_libraries(simulate_vnode PUBLIC craft lz4 uv_a)
|
||||
if(${BUILD_WINGETOPT})
|
||||
target_link_libraries(simulate_vnode PUBLIC wingetopt)
|
||||
target_include_directories(
|
||||
simulate_vnode
|
||||
PUBLIC "${TD_SOURCE_DIR}/contrib/wingetopt/src"
|
||||
)
|
||||
endif()
|
||||
target_link_libraries(simulate_vnode PUBLIC craft lz4 uv_a)
|
|
@ -6,39 +6,43 @@
|
|||
#define POINTER_SHIFT(ptr, s) ((void *)(((char *)ptr) + (s)))
|
||||
#define POINTER_DISTANCE(pa, pb) ((char *)(pb) - (char *)(pa))
|
||||
|
||||
static inline void tPutA(void **buf, uint64_t val) {
|
||||
memcpy(buf, &val, sizeof(val));
|
||||
*buf = POINTER_SHIFT(buf, sizeof(val));
|
||||
}
|
||||
#define tPutA(buf, val) \
|
||||
({ \
|
||||
memcpy(buf, &val, sizeof(val)); \
|
||||
POINTER_SHIFT(buf, sizeof(val)); \
|
||||
})
|
||||
|
||||
static inline void tPutB(void **buf, uint64_t val) {
|
||||
((uint8_t *)buf)[7] = ((val) >> 56) & 0xff;
|
||||
((uint8_t *)buf)[6] = ((val) >> 48) & 0xff;
|
||||
((uint8_t *)buf)[5] = ((val) >> 40) & 0xff;
|
||||
((uint8_t *)buf)[4] = ((val) >> 32) & 0xff;
|
||||
((uint8_t *)buf)[3] = ((val) >> 24) & 0xff;
|
||||
((uint8_t *)buf)[2] = ((val) >> 16) & 0xff;
|
||||
((uint8_t *)buf)[1] = ((val) >> 8) & 0xff;
|
||||
((uint8_t *)buf)[0] = (val)&0xff;
|
||||
*buf = POINTER_SHIFT(buf, sizeof(val));
|
||||
}
|
||||
#define tPutB(buf, val) \
|
||||
({ \
|
||||
((uint8_t *)buf)[7] = ((val) >> 56) & 0xff; \
|
||||
((uint8_t *)buf)[6] = ((val) >> 48) & 0xff; \
|
||||
((uint8_t *)buf)[5] = ((val) >> 40) & 0xff; \
|
||||
((uint8_t *)buf)[4] = ((val) >> 32) & 0xff; \
|
||||
((uint8_t *)buf)[3] = ((val) >> 24) & 0xff; \
|
||||
((uint8_t *)buf)[2] = ((val) >> 16) & 0xff; \
|
||||
((uint8_t *)buf)[1] = ((val) >> 8) & 0xff; \
|
||||
((uint8_t *)buf)[0] = (val)&0xff; \
|
||||
POINTER_SHIFT(buf, sizeof(val)); \
|
||||
})
|
||||
|
||||
static inline void tPutC(void **buf, uint64_t val) {
|
||||
if (buf) {
|
||||
((uint64_t *)buf)[0] = (val);
|
||||
POINTER_SHIFT(buf, sizeof(val));
|
||||
}
|
||||
*buf = NULL;
|
||||
}
|
||||
#define tPutC(buf, val) \
|
||||
({ \
|
||||
if (buf) { \
|
||||
((uint64_t *)buf)[0] = (val); \
|
||||
POINTER_SHIFT(buf, sizeof(val)); \
|
||||
} \
|
||||
NULL; \
|
||||
})
|
||||
|
||||
static inline void tPutD(void **buf, uint64_t val) {
|
||||
uint64_t tmp = val;
|
||||
for (size_t i = 0; i < sizeof(val); i++) {
|
||||
((uint8_t *)buf)[i] = tmp & 0xff;
|
||||
tmp >>= 8;
|
||||
}
|
||||
*buf = POINTER_SHIFT(buf, sizeof(val));
|
||||
}
|
||||
#define tPutD(buf, val) \
|
||||
({ \
|
||||
uint64_t tmp = val; \
|
||||
for (size_t i = 0; i < sizeof(val); i++) { \
|
||||
((uint8_t *)buf)[i] = tmp & 0xff; \
|
||||
tmp >>= 8; \
|
||||
} \
|
||||
POINTER_SHIFT(buf, sizeof(val)); \
|
||||
})
|
||||
|
||||
static inline void tPutE(void **buf, uint64_t val) {
|
||||
if (buf) {
|
||||
|
@ -57,7 +61,7 @@ static void func(T t) {
|
|||
switch (t) {
|
||||
case A:
|
||||
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
|
||||
tPutA(pBuf, val);
|
||||
pBuf = tPutA(pBuf, val);
|
||||
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
|
||||
pBuf = buf;
|
||||
}
|
||||
|
@ -65,7 +69,7 @@ static void func(T t) {
|
|||
break;
|
||||
case B:
|
||||
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
|
||||
tPutB(pBuf, val);
|
||||
pBuf = tPutB(pBuf, val);
|
||||
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
|
||||
pBuf = buf;
|
||||
}
|
||||
|
@ -73,7 +77,7 @@ static void func(T t) {
|
|||
break;
|
||||
case C:
|
||||
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
|
||||
tPutC(pBuf, val);
|
||||
pBuf = tPutC(pBuf, val);
|
||||
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
|
||||
pBuf = buf;
|
||||
}
|
||||
|
@ -81,7 +85,7 @@ static void func(T t) {
|
|||
break;
|
||||
case D:
|
||||
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
|
||||
tPutD(pBuf, val);
|
||||
pBuf = tPutD(pBuf, val);
|
||||
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
|
||||
pBuf = buf;
|
||||
}
|
||||
|
|
|
@ -238,10 +238,16 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
|
|||
|
||||
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
|
||||
int8_t needCompress) {
|
||||
int32_t* colSizes = (int32_t*)data;
|
||||
int32_t* actualLen = (int32_t*) data;
|
||||
data += sizeof(int32_t);
|
||||
|
||||
uint64_t* groupId = (uint64_t*) data;
|
||||
data += sizeof(uint64_t);
|
||||
|
||||
int32_t* colSizes = (int32_t*)data;
|
||||
data += numOfCols * sizeof(int32_t);
|
||||
*dataLen = (numOfCols * sizeof(int32_t));
|
||||
|
||||
*dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
|
||||
|
||||
int32_t numOfRows = pBlock->info.rows;
|
||||
for (int32_t col = 0; col < numOfCols; ++col) {
|
||||
|
@ -273,6 +279,9 @@ static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* da
|
|||
|
||||
colSizes[col] = htonl(colSizes[col]);
|
||||
}
|
||||
|
||||
*actualLen = *dataLen;
|
||||
*groupId = pBlock->info.groupId;
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -37,7 +37,7 @@ typedef struct SFuncExecEnv {
|
|||
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
||||
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
||||
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock, int32_t slotId);
|
||||
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
||||
typedef struct SScalarFuncExecFuncs {
|
||||
|
@ -141,8 +141,7 @@ struct SResultRowEntryInfo;
|
|||
|
||||
//for selectivity query, the corresponding tag value is assigned if the data is qualified
|
||||
typedef struct SSubsidiaryResInfo {
|
||||
int16_t bufLen; // keep the tags data for top/bottom query result
|
||||
int16_t numOfCols;
|
||||
int16_t num;
|
||||
struct SqlFunctionCtx **pCtx;
|
||||
} SSubsidiaryResInfo;
|
||||
|
||||
|
@ -187,8 +186,8 @@ typedef struct SqlFunctionCtx {
|
|||
uint8_t currentStage; // record current running step, default: 0
|
||||
bool isAggSet;
|
||||
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
|
||||
/////////////////////////////////////////////////////////////////
|
||||
bool stableQuery;
|
||||
/////////////////////////////////////////////////////////////////
|
||||
int16_t functionId; // function id
|
||||
char * pOutput; // final result output buffer, point to sdata->data
|
||||
int32_t numOfParams;
|
||||
|
@ -198,11 +197,15 @@ typedef struct SqlFunctionCtx {
|
|||
int32_t offset;
|
||||
SVariant tag;
|
||||
struct SResultRowEntryInfo *resultInfo;
|
||||
SSubsidiaryResInfo subsidiaryRes;
|
||||
SPoint1 start;
|
||||
SPoint1 end;
|
||||
SFuncExecFuncs fpSet;
|
||||
SScalarFuncExecFuncs sfp;
|
||||
SSubsidiaryResInfo subsidiaries;
|
||||
SPoint1 start;
|
||||
SPoint1 end;
|
||||
SFuncExecFuncs fpSet;
|
||||
SScalarFuncExecFuncs sfp;
|
||||
SExprInfo *pExpr;
|
||||
struct SDiskbasedBuf *pBuf;
|
||||
struct SSDataBlock *pSrcBlock;
|
||||
int32_t curBufPage;
|
||||
} SqlFunctionCtx;
|
||||
|
||||
enum {
|
||||
|
|
|
@ -105,9 +105,9 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
|
|||
epSet.epSet.eps[0].port = port;
|
||||
}
|
||||
|
||||
char* key = getClusterKey(user, secretEncrypt, ip, port);
|
||||
SAppInstInfo** pInst = NULL;
|
||||
char* key = getClusterKey(user, secretEncrypt, ip, port);
|
||||
|
||||
SAppInstInfo** pInst = NULL;
|
||||
taosThreadMutexLock(&appInfo.mutex);
|
||||
|
||||
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
|
||||
|
@ -840,10 +840,21 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t* colLength = (int32_t*)pResultInfo->pData;
|
||||
char* pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols;
|
||||
char* p = (char*) pResultInfo->pData;
|
||||
|
||||
int32_t dataLen = *(int32_t*) p;
|
||||
p += sizeof(int32_t);
|
||||
|
||||
uint64_t groupId = *(uint64_t*) p;
|
||||
p += sizeof(uint64_t);
|
||||
|
||||
int32_t* colLength = (int32_t*)p;
|
||||
p += sizeof(int32_t) * numOfCols;
|
||||
|
||||
char* pStart = p;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
colLength[i] = htonl(colLength[i]);
|
||||
ASSERT(colLength[i] < dataLen);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||
pResultInfo->pCol[i].offset = (int32_t*)pStart;
|
||||
|
|
|
@ -644,12 +644,12 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
|
|||
|
||||
/**
|
||||
* @refitem blockDataToBuf for the meta size
|
||||
*
|
||||
* @param pBlock
|
||||
* @return
|
||||
*/
|
||||
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
|
||||
return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t);
|
||||
// | total rows/total length | block group id | each column length |
|
||||
return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t);
|
||||
}
|
||||
|
||||
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
||||
|
@ -1219,7 +1219,6 @@ void colDataDestroy(SColumnInfoData* pColData) {
|
|||
taosMemoryFree(pColData->pData);
|
||||
}
|
||||
|
||||
|
||||
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
|
||||
int32_t len = BitmapLen(total);
|
||||
|
||||
|
|
|
@ -3223,8 +3223,13 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
|
|||
tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
|
||||
cur->win.skey, cur->win.ekey, pHandle->idStr);
|
||||
|
||||
// pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign
|
||||
// the table uid
|
||||
pDataBlockInfo->uid = uid;
|
||||
|
||||
#if 0
|
||||
// for multi-group data query processing test purpose
|
||||
pDataBlockInfo->groupId = uid;
|
||||
#endif
|
||||
|
||||
pDataBlockInfo->rows = cur->rows;
|
||||
pDataBlockInfo->window = cur->win;
|
||||
pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
|
||||
|
|
|
@ -304,8 +304,8 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t
|
|||
|
||||
memcpy(row.buf, tbuf, len);
|
||||
row.level = level;
|
||||
row.len = len;
|
||||
ctx->dataSize += len;
|
||||
row.len = len;
|
||||
ctx->dataSize += row.len;
|
||||
|
||||
if (NULL == taosArrayPush(ctx->rows, &row)) {
|
||||
qError("taosArrayPush row to explain res rows failed");
|
||||
|
@ -756,7 +756,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
}
|
||||
|
||||
int32_t colNum = 1;
|
||||
int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
|
||||
int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == rsp) {
|
||||
qError("malloc SRetrieveTableRsp failed, size:%d", rspSize);
|
||||
|
@ -766,29 +766,38 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
rsp->completed = 1;
|
||||
rsp->numOfRows = htonl(rowNum);
|
||||
|
||||
*(int32_t *)rsp->data = htonl(pCtx->dataSize);
|
||||
// payload length
|
||||
*(int32_t *)rsp->data = sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
|
||||
|
||||
int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t));
|
||||
// group id
|
||||
*(uint64_t*)(rsp->data + sizeof(int32_t)) = 0;
|
||||
|
||||
// column length
|
||||
int32_t* colLength = (int32_t *)(rsp->data + sizeof(int32_t) + sizeof(uint64_t));
|
||||
|
||||
// varchar column offset segment
|
||||
int32_t *offset = (int32_t *)((char *)colLength + sizeof(int32_t));
|
||||
|
||||
// varchar data real payload
|
||||
char *data = (char *)(offset + rowNum);
|
||||
int32_t tOffset = 0;
|
||||
|
||||
|
||||
char* start = data;
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i);
|
||||
*offset = tOffset;
|
||||
tOffset += row->len;
|
||||
offset[i] = data - start;
|
||||
|
||||
memcpy(data, row->buf, row->len);
|
||||
|
||||
++offset;
|
||||
varDataCopy(data, row->buf);
|
||||
ASSERT(varDataTLen(row->buf) == row->len);
|
||||
data += row->len;
|
||||
}
|
||||
|
||||
*pRsp = rsp;
|
||||
*colLength = htonl(data - start);
|
||||
rsp->compLen = htonl(rspSize);
|
||||
|
||||
*pRsp = rsp;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) {
|
||||
int32_t code = 0;
|
||||
SNodeListNode *plans = NULL;
|
||||
|
@ -895,9 +904,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
|
|||
|
||||
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0));
|
||||
|
||||
QRY_ERR_RET(qExplainAppendPlanRows(pCtx));
|
||||
|
||||
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -967,13 +974,10 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
|
|||
SExplainCtx *pCtx = NULL;
|
||||
|
||||
QRY_ERR_RET(qExplainPrepareCtx(pDag, &pCtx));
|
||||
|
||||
QRY_ERR_JRET(qExplainGenerateRsp(pCtx, pRsp));
|
||||
|
||||
_return:
|
||||
|
||||
qExplainFreeCtx(pCtx);
|
||||
|
||||
QRY_RET(code);
|
||||
}
|
||||
|
||||
|
|
|
@ -40,8 +40,6 @@
|
|||
|
||||
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
|
||||
|
||||
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
|
||||
|
||||
typedef struct SGroupResInfo {
|
||||
int32_t totalGroup;
|
||||
int32_t currentGroup;
|
||||
|
@ -68,11 +66,16 @@ typedef struct SResultRowPosition {
|
|||
int32_t offset;
|
||||
} SResultRowPosition;
|
||||
|
||||
typedef struct SResKeyPos {
|
||||
SResultRowPosition pos;
|
||||
uint64_t groupId;
|
||||
char key[];
|
||||
} SResKeyPos;
|
||||
|
||||
typedef struct SResultRowInfo {
|
||||
SResultRowPosition *pPosition;
|
||||
int32_t size; // number of result set
|
||||
int32_t capacity; // max capacity
|
||||
// int32_t curPos; // current active result row index of pResult list
|
||||
SResultRowPosition cur;
|
||||
} SResultRowInfo;
|
||||
|
||||
|
@ -135,7 +138,7 @@ typedef struct {
|
|||
int32_t colId;
|
||||
} SStddevInterResult;
|
||||
|
||||
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
|
||||
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, bool sortGroupResult);
|
||||
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||
|
||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||
|
|
|
@ -347,10 +347,11 @@ typedef struct STableScanInfo {
|
|||
} STableScanInfo;
|
||||
|
||||
typedef struct STagScanInfo {
|
||||
SColumnInfo* pCols;
|
||||
SSDataBlock* pRes;
|
||||
SColumnInfo *pCols;
|
||||
SSDataBlock *pRes;
|
||||
int32_t totalTables;
|
||||
int32_t curPos;
|
||||
void *pReader;
|
||||
} STagScanInfo;
|
||||
|
||||
typedef struct SStreamBlockScanInfo {
|
||||
|
@ -376,13 +377,11 @@ typedef struct SSysTableScanInfo {
|
|||
SEpSet epSet;
|
||||
tsem_t ready;
|
||||
|
||||
int32_t accountId;
|
||||
bool showRewrite;
|
||||
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
||||
void* pCur; // cursor for iterate the local table meta store.
|
||||
SArray* scanCols; // SArray<int16_t> scan column id list
|
||||
|
||||
// int32_t type; // show type, TODO remove it
|
||||
int32_t accountId;
|
||||
bool showRewrite;
|
||||
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
||||
void* pCur; // cursor for iterate the local table meta store.
|
||||
SArray* scanCols; // SArray<int16_t> scan column id list
|
||||
SName name;
|
||||
SSDataBlock* pRes;
|
||||
int32_t capacity;
|
||||
|
@ -628,7 +627,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
|
||||
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pReaderHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
|
||||
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition,
|
||||
SInterval* pInterval, double ratio, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||
|
@ -668,12 +667,12 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
#if 0
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||
SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
#endif
|
||||
|
||||
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
||||
|
|
|
@ -64,10 +64,10 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
|
|||
}
|
||||
|
||||
// data format:
|
||||
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
|
||||
// | | sizeof(int32_t) * numOfCols | actual size | | actual size | |
|
||||
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||
// |SDataCacheEntry | total length | group id | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
|
||||
// | | (4 bytes) |(8 bytes) | sizeof(int32_t) * numOfCols | actual size | | actual size | |
|
||||
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
||||
// recorded in the first segment, next to the struct header
|
||||
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||
|
|
|
@ -186,12 +186,50 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
|||
pGroupResInfo->index = 0;
|
||||
}
|
||||
|
||||
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) {
|
||||
static int32_t resultrowCompar1(const void* p1, const void* p2) {
|
||||
SResKeyPos* pp1 = *(SResKeyPos**) p1;
|
||||
SResKeyPos* pp2 = *(SResKeyPos**) p2;
|
||||
|
||||
if (pp1->groupId == pp2->groupId) {
|
||||
int64_t pts1 = *(int64_t*) pp1->key;
|
||||
int64_t pts2 = *(int64_t*) pp2->key;
|
||||
|
||||
if (pts1 == pts2) {
|
||||
return 0;
|
||||
} else {
|
||||
return pts1 < pts2? -1:1;
|
||||
}
|
||||
} else {
|
||||
return pp1->groupId < pp2->groupId? -1:1;
|
||||
}
|
||||
}
|
||||
|
||||
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, bool sortGroupResult) {
|
||||
if (pGroupResInfo->pRows != NULL) {
|
||||
taosArrayDestroy(pGroupResInfo->pRows);
|
||||
}
|
||||
|
||||
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pPosition, pResultInfo->size, sizeof(SResultRowPosition));
|
||||
// extract the result rows information from the hash map
|
||||
void* pData = NULL;
|
||||
pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES);
|
||||
|
||||
size_t keyLen = 0;
|
||||
while((pData = taosHashIterate(pHashmap, pData)) != NULL) {
|
||||
void* key = taosHashGetKey(pData, &keyLen);
|
||||
|
||||
SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition));
|
||||
|
||||
p->groupId = *(uint64_t*) key;
|
||||
p->pos = *(SResultRowPosition*) pData;
|
||||
memcpy(p->key, key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
|
||||
|
||||
taosArrayPush(pGroupResInfo->pRows, &p);
|
||||
}
|
||||
|
||||
if (sortGroupResult) {
|
||||
qsort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, resultrowCompar1);
|
||||
}
|
||||
|
||||
pGroupResInfo->index = 0;
|
||||
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||
}
|
||||
|
|
|
@ -235,7 +235,6 @@ static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDi
|
|||
|
||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
||||
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
|
||||
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
@ -298,26 +297,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
|||
return pBlock;
|
||||
}
|
||||
|
||||
static bool isSelectivityWithTagsQuery(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||
return true;
|
||||
// bool hasTags = false;
|
||||
// int32_t numOfSelectivity = 0;
|
||||
//
|
||||
// for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
// int32_t functId = pCtx[i].functionId;
|
||||
// if (functId == FUNCTION_TAG_DUMMY || functId == FUNCTION_TS_DUMMY) {
|
||||
// hasTags = true;
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// if ((aAggs[functId].status & FUNCSTATE_SELECTIVITY) != 0) {
|
||||
// numOfSelectivity++;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return (numOfSelectivity > 0 && hasTags);
|
||||
}
|
||||
|
||||
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
||||
if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
|
||||
pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
|
@ -433,6 +412,13 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
|
|||
return pResultRow;
|
||||
}
|
||||
|
||||
/**
|
||||
* the struct of key in hash table
|
||||
* +----------+---------------+
|
||||
* | group id | key data |
|
||||
* | 8 bytes | actual length |
|
||||
* +----------+---------------+
|
||||
*/
|
||||
static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid,
|
||||
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
||||
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) {
|
||||
|
@ -1098,8 +1084,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
pCtx[i].order = order;
|
||||
pCtx[i].size = pBlock->info.rows;
|
||||
pCtx[i].order = order;
|
||||
pCtx[i].size = pBlock->info.rows;
|
||||
pCtx[i].pSrcBlock = pBlock;
|
||||
pCtx[i].currentStage = MAIN_SCAN;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx[i].input;
|
||||
|
@ -1474,7 +1461,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
|
||||
SArray* pUpdated = NULL;
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
pUpdated = taosArrayInit(4, sizeof(SResultRowPosition));
|
||||
pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
|
||||
int32_t step = 1;
|
||||
|
@ -1486,8 +1473,6 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
if (pSDataBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
// assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] ==
|
||||
// pSDataBlock->info.window.ekey);
|
||||
}
|
||||
|
||||
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
||||
|
@ -1506,7 +1491,11 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
}
|
||||
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
||||
pos->groupId = tableGroupId;
|
||||
pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
*(int64_t*) pos->key = pResult->win.skey;
|
||||
|
||||
taosArrayPush(pUpdated, &pos);
|
||||
}
|
||||
|
||||
|
@ -1580,7 +1569,11 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
}
|
||||
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
||||
pos->groupId = tableGroupId;
|
||||
pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
*(int64_t*) pos->key = pResult->win.skey;
|
||||
|
||||
taosArrayPush(pUpdated, &pos);
|
||||
}
|
||||
|
||||
|
@ -1844,10 +1837,6 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock*
|
|||
|
||||
// set the output buffer for the selectivity + tag query
|
||||
static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||
if (!isSelectivityWithTagsQuery(pCtx, numOfOutput)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
int16_t tagLen = 0;
|
||||
|
||||
|
@ -1874,9 +1863,8 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
}
|
||||
}
|
||||
if (p != NULL) {
|
||||
p->subsidiaryRes.pCtx = pTagCtx;
|
||||
p->subsidiaryRes.numOfCols = num;
|
||||
p->subsidiaryRes.bufLen = tagLen;
|
||||
p->subsidiaries.pCtx = pTagCtx;
|
||||
p->subsidiaries.num = num;
|
||||
} else {
|
||||
taosMemoryFreeClear(pTagCtx);
|
||||
}
|
||||
|
@ -1903,6 +1891,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
SqlFunctionCtx* pCtx = &pFuncCtx[i];
|
||||
|
||||
pCtx->functionId = -1;
|
||||
pCtx->curBufPage = -1;
|
||||
pCtx->pExpr = pExpr;
|
||||
|
||||
if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||
SFuncExecEnv env = {0};
|
||||
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
|
||||
|
@ -1930,9 +1921,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
pCtx->pTsOutput = NULL;
|
||||
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
|
||||
pCtx->resDataInfo.type = pFunct->resSchema.type;
|
||||
pCtx->order = TSDB_ORDER_ASC;
|
||||
pCtx->order = TSDB_ORDER_ASC;
|
||||
pCtx->start.key = INT64_MIN;
|
||||
pCtx->end.key = INT64_MIN;
|
||||
pCtx->end.key = INT64_MIN;
|
||||
pCtx->numOfParams = pExpr->base.numOfParams;
|
||||
|
||||
pCtx->param = pFunct->pParam;
|
||||
|
@ -1992,7 +1983,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
taosVariantDestroy(&pCtx[i].tag);
|
||||
taosMemoryFreeClear(pCtx[i].subsidiaryRes.pCtx);
|
||||
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pCtx);
|
||||
|
@ -2095,25 +2086,6 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
|
|||
return status;
|
||||
}
|
||||
|
||||
static void doUpdateLastKey(STaskAttr* pQueryAttr) {
|
||||
STimeWindow* win = &pQueryAttr->window;
|
||||
|
||||
size_t num = taosArrayGetSize(pQueryAttr->tableGroupInfo.pGroupList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SArray* p1 = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
|
||||
|
||||
size_t len = taosArrayGetSize(p1);
|
||||
for (int32_t j = 0; j < len; ++j) {
|
||||
// STableKeyInfo* pInfo = taosArrayGet(p1, j);
|
||||
//
|
||||
// // update the new lastkey if it is equalled to the value of the old skey
|
||||
// if (pInfo->lastKey == win->ekey) {
|
||||
// pInfo->lastKey = win->skey;
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
|
||||
// STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
||||
//
|
||||
|
@ -2848,6 +2820,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
|
|||
}
|
||||
}
|
||||
|
||||
// todo merged with the build group result.
|
||||
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
|
||||
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
|
||||
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
|
||||
|
@ -2882,40 +2855,36 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
|
|||
}
|
||||
}
|
||||
|
||||
// todo merged with the build group result.
|
||||
void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList,
|
||||
int32_t* rowCellInfoOffset) {
|
||||
size_t num = taosArrayGetSize(pUpdateList);
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SResultRowPosition* pPos = taosArrayGet(pUpdateList, i);
|
||||
|
||||
SFilePage* bufPage = getBufPage(pBuf, pPos->pageId);
|
||||
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset);
|
||||
SResKeyPos * pPos = taosArrayGetP(pUpdateList, i);
|
||||
|
||||
SFilePage* bufPage = getBufPage(pBuf, pPos->pos.pageId);
|
||||
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset);
|
||||
//
|
||||
for (int32_t j = 0; j < numOfOutput; ++j) {
|
||||
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
|
||||
|
||||
//
|
||||
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
|
||||
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pCtx[j].fpSet.process) { // TODO set the dummy function.
|
||||
// pCtx[j].fpSet.finalize(&pCtx[j]);
|
||||
pResInfo->initialized = true;
|
||||
}
|
||||
|
||||
// if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// if (pCtx[j].fpSet.process) { // TODO set the dummy function.
|
||||
//// pCtx[j].fpSet.finalize(&pCtx[j]);
|
||||
// pResInfo->initialized = true;
|
||||
// }
|
||||
//
|
||||
if (pRow->numOfRows < pResInfo->numOfRes) {
|
||||
pRow->numOfRows = pResInfo->numOfRes;
|
||||
}
|
||||
}
|
||||
|
||||
releaseBufPage(pBuf, bufPage);
|
||||
/*
|
||||
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
|
||||
* the top and bottom query
|
||||
*/
|
||||
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3062,33 +3031,6 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
|
|||
pAggInfo->groupId = groupId;
|
||||
}
|
||||
|
||||
void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) {
|
||||
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
SExprBasicInfo* pExpr = &pExprInfo->base;
|
||||
// if (pQueryAttr->stableQuery && (pRuntimeEnv->pTsBuf != NULL) &&
|
||||
// (pExpr->functionId == FUNCTION_TS || pExpr->functionId == FUNCTION_PRJ) &&
|
||||
// (pExpr->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_ID)) {
|
||||
// assert(pExpr->numOfParams == 1);
|
||||
//
|
||||
// int16_t tagColId = (int16_t)pExprInfo->base.param[0].i;
|
||||
// SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId);
|
||||
//
|
||||
// doSetTagValueInParam(pTable, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes);
|
||||
//
|
||||
// int16_t tagType = pCtx[0].tag.nType;
|
||||
// if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) {
|
||||
// //qDebug("QInfo:0x%"PRIx64" set tag value for join comparison, colId:%" PRId64 ", val:%s",
|
||||
// GET_TASKID(pRuntimeEnv),
|
||||
//// pExprInfo->base.param[0].i, pCtx[0].tag.pz);
|
||||
// } else {
|
||||
// //qDebug("QInfo:0x%"PRIx64" set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64,
|
||||
// GET_TASKID(pRuntimeEnv),
|
||||
//// pExprInfo->base.param[0].i, pCtx[0].tag.i);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
/*
|
||||
* There are two cases to handle:
|
||||
*
|
||||
|
@ -3131,7 +3073,6 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin
|
|||
}
|
||||
|
||||
/**
|
||||
* copyToOutputBuf support copy data in ascending/descending order
|
||||
* For interval query of both super table and table, copy the data in ascending order, since the output results are
|
||||
* ordered in SWindowResutl already. While handling the group by query for both table and super table,
|
||||
* all group result are completed already.
|
||||
|
@ -3159,10 +3100,10 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
|
|||
}
|
||||
|
||||
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
|
||||
SResultRowPosition* pPos = taosArrayGet(pGroupResInfo->pRows, i);
|
||||
SFilePage* page = getBufPage(pBuf, pPos->pageId);
|
||||
SResKeyPos *pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
||||
|
||||
SResultRow* pRow = (SResultRow*)((char*)page + pPos->offset);
|
||||
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
|
||||
if (pRow->numOfRows == 0) {
|
||||
pGroupResInfo->index += 1;
|
||||
continue;
|
||||
|
@ -3181,7 +3122,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
|
|||
|
||||
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
|
||||
if (pCtx[j].fpSet.process) {
|
||||
pCtx[j].fpSet.finalize(&pCtx[j], pBlock, slotId);
|
||||
pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
} else {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
|
@ -3806,9 +3747,15 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
|||
blockDataEnsureCapacity(pRes, numOfRows);
|
||||
|
||||
if (pColList == NULL) { // data from other sources
|
||||
int32_t* colLen = (int32_t*)pData;
|
||||
char* pStart = pData + sizeof(int32_t) * numOfOutput;
|
||||
int32_t dataLen = *(int32_t*) pData;
|
||||
pData += sizeof(int32_t);
|
||||
|
||||
pRes->info.groupId = *(uint64_t*) pData;
|
||||
pData += sizeof(uint64_t);
|
||||
|
||||
int32_t* colLen = (int32_t*)pData;
|
||||
|
||||
char* pStart = pData + sizeof(int32_t) * numOfOutput;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
colLen[i] = htonl(colLen[i]);
|
||||
ASSERT(colLen[i] > 0);
|
||||
|
@ -3861,7 +3808,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
|||
|
||||
blockDataEnsureCapacity(&block, numOfRows);
|
||||
|
||||
int32_t* colLen = (int32_t*)pStart;
|
||||
int32_t dataLen = *(int32_t*) pStart;
|
||||
uint64_t groupId = *(uint64_t*) (pStart + sizeof(int32_t));
|
||||
pStart += sizeof(int32_t) + sizeof(uint64_t);
|
||||
|
||||
int32_t* colLen = (int32_t*) (pStart);
|
||||
pStart += sizeof(int32_t) * numOfCols;
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
|
@ -3903,6 +3854,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
|||
}
|
||||
|
||||
pRes->info.rows = numOfRows;
|
||||
blockDataUpdateTsWindow(pRes);
|
||||
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
|
||||
|
@ -4318,18 +4270,6 @@ static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
|
|||
// }
|
||||
}
|
||||
|
||||
static SExprInfo* exprArrayDup(SArray* pExprList) {
|
||||
size_t numOfOutput = taosArrayGetSize(pExprList);
|
||||
|
||||
SExprInfo* p = taosMemoryCalloc(numOfOutput, sizeof(SExprInfo));
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SExprInfo* pExpr = taosArrayGetP(pExprList, i);
|
||||
assignExprInfo(&p[i], pExpr);
|
||||
}
|
||||
|
||||
return p;
|
||||
}
|
||||
|
||||
// TODO merge aggregate super table
|
||||
static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
|
@ -4822,7 +4762,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf,
|
||||
&pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pAggInfo->groupResInfo, &pAggInfo->binfo.resultRowInfo);
|
||||
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false);
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -5125,7 +5065,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
||||
|
||||
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId);
|
||||
|
||||
#if 0 // test for encode/decode result info
|
||||
if(pOperator->encodeResultRow){
|
||||
|
@ -5147,7 +5087,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -5276,7 +5216,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
|
||||
initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
|
||||
// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
|
||||
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
|
||||
|
||||
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
|
||||
|
@ -5327,7 +5267,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
|
|||
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||
// initGroupedResultInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
@ -5458,7 +5398,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf,
|
||||
pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
|
||||
|
@ -5510,7 +5450,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
|
|||
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf,
|
||||
pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
|
||||
|
@ -5699,6 +5639,11 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
|
|||
pBasicInfo->pRes = pResultBlock;
|
||||
|
||||
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
pBasicInfo->pCtx[i].pBuf = pAggSup->pResultBuf;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -5847,11 +5792,6 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
}
|
||||
|
||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
STagScanInfo* pInfo = (STagScanInfo*)param;
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
|
||||
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
|
||||
|
@ -5941,11 +5881,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
// pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
|
||||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
|
@ -6204,157 +6145,6 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||
#if 0
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
|
||||
|
||||
STagScanInfo *pInfo = pOperator->info;
|
||||
SSDataBlock *pRes = pInfo->pRes;
|
||||
*newgroup = false;
|
||||
|
||||
int32_t count = 0;
|
||||
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||
|
||||
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
|
||||
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
|
||||
assert(pQueryAttr->numOfOutput == 1);
|
||||
|
||||
SExprInfo* pExprInfo = &pOperator->pExpr[0];
|
||||
int32_t rsize = pExprInfo->base.resSchema.bytes;
|
||||
|
||||
count = 0;
|
||||
|
||||
int16_t bytes = pExprInfo->base.resSchema.bytes;
|
||||
int16_t type = pExprInfo->base.resSchema.type;
|
||||
|
||||
for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
|
||||
if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
|
||||
bytes = pQueryAttr->tagColList[i].bytes;
|
||||
type = pQueryAttr->tagColList[i].type;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
||||
|
||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
||||
int32_t i = pInfo->curPos++;
|
||||
STableQueryInfo *item = taosArrayGetP(pa, i);
|
||||
|
||||
char *output = pColInfo->pData + count * rsize;
|
||||
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
||||
|
||||
output = varDataVal(output);
|
||||
STableId* id = TSDB_TABLEID(item->pTable);
|
||||
|
||||
*(int16_t *)output = 0;
|
||||
output += sizeof(int16_t);
|
||||
|
||||
*(int64_t *)output = id->uid; // memory align problem, todo serialize
|
||||
output += sizeof(id->uid);
|
||||
|
||||
*(int32_t *)output = id->tid;
|
||||
output += sizeof(id->tid);
|
||||
|
||||
*(int32_t *)output = pQueryAttr->vgId;
|
||||
output += sizeof(pQueryAttr->vgId);
|
||||
|
||||
char* data = NULL;
|
||||
if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
data = tsdbGetTableName(item->pTable);
|
||||
} else {
|
||||
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
|
||||
}
|
||||
|
||||
doSetTagValueToResultBuf(output, data, type, bytes);
|
||||
count += 1;
|
||||
}
|
||||
|
||||
//qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||
} else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
||||
*(int64_t*)pColInfo->pData = pInfo->totalTables;
|
||||
count = 1;
|
||||
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
|
||||
} else { // return only the tags|table name etc.
|
||||
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
|
||||
|
||||
count = 0;
|
||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
||||
int32_t i = pInfo->curPos++;
|
||||
|
||||
STableQueryInfo* item = taosArrayGetP(pa, i);
|
||||
|
||||
char *data = NULL, *dst = NULL;
|
||||
int16_t type = 0, bytes = 0;
|
||||
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
||||
// not assign value in case of user defined constant output column
|
||||
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j);
|
||||
type = pExprInfo[j].base.resSchema.type;
|
||||
bytes = pExprInfo[j].base.resSchema.bytes;
|
||||
|
||||
if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
data = tsdbGetTableName(item->pTable);
|
||||
} else {
|
||||
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
||||
}
|
||||
|
||||
dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
||||
doSetTagValueToResultBuf(dst, data, type, bytes);
|
||||
}
|
||||
|
||||
count += 1;
|
||||
}
|
||||
|
||||
if (pInfo->curPos >= pInfo->totalTables) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
//qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||
}
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED);
|
||||
}
|
||||
|
||||
pRes->info.rows = count;
|
||||
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
|
||||
|
||||
#endif
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
|
||||
assert(numOfGroup == 0 || numOfGroup == 1);
|
||||
|
||||
pInfo->curPos = 0;
|
||||
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "SeqTableTagScan";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->getNextFn = doTagScan;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->closeFn = destroyTagScanOperatorInfo;
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
|
||||
int32_t j = 0;
|
||||
|
@ -6562,6 +6352,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
|
||||
int32_t numOfCols = 0;
|
||||
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||
if (pDataReader == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SArray* pColList =
|
||||
extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||
|
@ -6576,8 +6369,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
};
|
||||
|
||||
return createTableScanOperatorInfo(pDataReader, pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC,
|
||||
numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq[0], pTableScanNode->scanSeq[1], pColList,
|
||||
pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
||||
numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq[0], pTableScanNode->scanSeq[1], pColList,
|
||||
pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
||||
|
@ -6585,17 +6378,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||
|
||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
||||
queryId, taskId);
|
||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||
|
||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SArray* pColList =
|
||||
extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||
SOperatorInfo* pOperator =
|
||||
createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo);
|
||||
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo);
|
||||
taosArrayDestroy(tableIdList);
|
||||
return pOperator;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||
|
@ -6621,6 +6411,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
if (ops[i] == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* pOptr = NULL;
|
||||
|
@ -6988,8 +6781,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
|||
goto _complete;
|
||||
}
|
||||
|
||||
STableGroupInfo group = {0};
|
||||
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group);
|
||||
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo);
|
||||
if (NULL == (*pTaskInfo)->pRoot) {
|
||||
code = terrno;
|
||||
goto _complete;
|
||||
|
|
|
@ -308,7 +308,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
|||
// }
|
||||
|
||||
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
|
||||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
|
||||
|
||||
while(1) {
|
||||
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
|
||||
|
|
|
@ -271,9 +271,7 @@ static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCt
|
|||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo;
|
||||
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
*newgroup = false;
|
||||
|
||||
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
|
||||
|
@ -284,18 +282,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
pTableScanInfo->numOfBlocks += 1;
|
||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
|
||||
|
||||
// todo opt
|
||||
// if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) {
|
||||
// STableQueryInfo** pTableQueryInfo =
|
||||
// (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid));
|
||||
// if (pTableQueryInfo == NULL) {
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order);
|
||||
// }
|
||||
|
||||
// this function never returns error?
|
||||
uint32_t status = 0;
|
||||
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
||||
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
||||
|
@ -308,6 +294,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
continue;
|
||||
}
|
||||
|
||||
// reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream operator.
|
||||
pBlock->info.blockId = 0;
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
|
@ -405,11 +393,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int
|
|||
pOperator->name = "TableScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->getNextFn = doTableScan;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->getNextFn = doTableScan;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
static int32_t cost = 0;
|
||||
pOperator->cost.openCost = ++cost;
|
||||
|
@ -824,12 +812,12 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
int32_t tableNameSlotId = 1;
|
||||
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId);
|
||||
|
||||
char* name = NULL;
|
||||
char* tb = NULL;
|
||||
int32_t numOfRows = 0;
|
||||
|
||||
char n[TSDB_TABLE_NAME_LEN] = {0};
|
||||
while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) {
|
||||
STR_TO_VARSTR(n, name);
|
||||
while ((tb = metaTbCursorNext(pInfo->pCur)) != NULL) {
|
||||
STR_TO_VARSTR(n, tb);
|
||||
colDataAppend(pTableNameCol, numOfRows, n, false);
|
||||
numOfRows += 1;
|
||||
if (numOfRows >= pInfo->capacity) {
|
||||
|
@ -992,3 +980,167 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
|
|||
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||
#if 0
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
|
||||
|
||||
STagScanInfo *pInfo = pOperator->info;
|
||||
SSDataBlock *pRes = pInfo->pRes;
|
||||
*newgroup = false;
|
||||
|
||||
int32_t count = 0;
|
||||
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||
|
||||
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
|
||||
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
|
||||
assert(pQueryAttr->numOfOutput == 1);
|
||||
|
||||
SExprInfo* pExprInfo = &pOperator->pExpr[0];
|
||||
int32_t rsize = pExprInfo->base.resSchema.bytes;
|
||||
|
||||
count = 0;
|
||||
|
||||
int16_t bytes = pExprInfo->base.resSchema.bytes;
|
||||
int16_t type = pExprInfo->base.resSchema.type;
|
||||
|
||||
for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
|
||||
if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
|
||||
bytes = pQueryAttr->tagColList[i].bytes;
|
||||
type = pQueryAttr->tagColList[i].type;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
||||
|
||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
||||
int32_t i = pInfo->curPos++;
|
||||
STableQueryInfo *item = taosArrayGetP(pa, i);
|
||||
|
||||
char *output = pColInfo->pData + count * rsize;
|
||||
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
||||
|
||||
output = varDataVal(output);
|
||||
STableId* id = TSDB_TABLEID(item->pTable);
|
||||
|
||||
*(int16_t *)output = 0;
|
||||
output += sizeof(int16_t);
|
||||
|
||||
*(int64_t *)output = id->uid; // memory align problem, todo serialize
|
||||
output += sizeof(id->uid);
|
||||
|
||||
*(int32_t *)output = id->tid;
|
||||
output += sizeof(id->tid);
|
||||
|
||||
*(int32_t *)output = pQueryAttr->vgId;
|
||||
output += sizeof(pQueryAttr->vgId);
|
||||
|
||||
char* data = NULL;
|
||||
if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
data = tsdbGetTableName(item->pTable);
|
||||
} else {
|
||||
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
|
||||
}
|
||||
|
||||
doSetTagValueToResultBuf(output, data, type, bytes);
|
||||
count += 1;
|
||||
}
|
||||
|
||||
//qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||
} else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
||||
*(int64_t*)pColInfo->pData = pInfo->totalTables;
|
||||
count = 1;
|
||||
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
|
||||
} else { // return only the tags|table name etc.
|
||||
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
|
||||
|
||||
count = 0;
|
||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
||||
int32_t i = pInfo->curPos++;
|
||||
|
||||
STableQueryInfo* item = taosArrayGetP(pa, i);
|
||||
|
||||
char *data = NULL, *dst = NULL;
|
||||
int16_t type = 0, bytes = 0;
|
||||
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
||||
// not assign value in case of user defined constant output column
|
||||
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j);
|
||||
type = pExprInfo[j].base.resSchema.type;
|
||||
bytes = pExprInfo[j].base.resSchema.bytes;
|
||||
|
||||
if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
data = tsdbGetTableName(item->pTable);
|
||||
} else {
|
||||
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
||||
}
|
||||
|
||||
dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
||||
doSetTagValueToResultBuf(dst, data, type, bytes);
|
||||
}
|
||||
|
||||
count += 1;
|
||||
}
|
||||
|
||||
if (pInfo->curPos >= pInfo->totalTables) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
//qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||
}
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED);
|
||||
}
|
||||
|
||||
pRes->info.rows = count;
|
||||
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
|
||||
|
||||
#endif
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
STagScanInfo* pInfo = (STagScanInfo*)param;
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
}
|
||||
|
||||
SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pReader = pReaderHandle;
|
||||
pInfo->curPos = 0;
|
||||
pOperator->name = "TagScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->getNextFn = doTagScan;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->closeFn = destroyTagScanOperatorInfo;
|
||||
|
||||
return pOperator;
|
||||
_error:
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
|||
#include "functionMgt.h"
|
||||
|
||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
|
@ -43,17 +43,17 @@ int32_t maxFunction(SqlFunctionCtx *pCtx);
|
|||
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||
|
@ -65,7 +65,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx);
|
|||
|
||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "builtinsimpl.h"
|
||||
#include <libs/nodes/querynodes.h>
|
||||
#include "function.h"
|
||||
#include "querynodes.h"
|
||||
#include "taggfunction.h"
|
||||
#include "tdatablock.h"
|
||||
|
@ -44,8 +44,6 @@ typedef struct STopBotResItem {
|
|||
} STopBotResItem;
|
||||
|
||||
typedef struct STopBotRes {
|
||||
int32_t pageId;
|
||||
// int32_t num;
|
||||
STopBotResItem *pItems;
|
||||
} STopBotRes;
|
||||
|
||||
|
@ -92,18 +90,6 @@ typedef struct SDiffInfo {
|
|||
} \
|
||||
} while (0);
|
||||
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
|
@ -139,7 +125,8 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
|
@ -406,7 +393,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
@ -416,7 +403,7 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
|||
pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count);
|
||||
}
|
||||
|
||||
return functionFinalize(pCtx, pBlock, slotId);
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
|
||||
|
@ -521,6 +508,49 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
|
||||
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
||||
|
||||
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
|
||||
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
(left) = (right); \
|
||||
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
||||
(num) += 1; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
do { \
|
||||
_t *d = (_t *)((_col)->pData); \
|
||||
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||
continue; \
|
||||
} \
|
||||
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
|
||||
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||
int32_t numOfElems = 0;
|
||||
|
@ -564,8 +594,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
int64_t val = GET_INT64_VAL(tval);
|
||||
if ((prev < val) ^ isMinFunc) {
|
||||
*(int64_t*) buf = val;
|
||||
for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
|
||||
SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
|
||||
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
||||
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
||||
__ctx->tag.i = key;
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
||||
|
@ -581,8 +611,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
uint64_t val = GET_UINT64_VAL(tval);
|
||||
if ((prev < val) ^ isMinFunc) {
|
||||
*(uint64_t*) buf = val;
|
||||
for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
|
||||
SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
|
||||
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
||||
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
||||
__ctx->tag.i = key;
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
||||
|
@ -797,7 +827,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
@ -810,7 +840,7 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId
|
|||
pStddevRes->result = sqrt(pStddevRes->quadraticDSum/((double)pStddevRes->count) - avg*avg);
|
||||
}
|
||||
|
||||
return functionFinalize(pCtx, pBlock, slotId);
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
|
@ -923,7 +953,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SVariant* pVal = &pCtx->param[1].param;
|
||||
double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
|
||||
|
||||
|
@ -936,7 +966,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t sl
|
|||
}
|
||||
|
||||
tMemBucketDestroy(pMemBucket);
|
||||
return functionFinalize(pCtx, pBlock, slotId);
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
|
@ -1353,15 +1383,16 @@ static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
|
|||
return pRes;
|
||||
}
|
||||
|
||||
static void doAddIntoResult(STopBotRes* pRes, int32_t maxSize, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
|
||||
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
|
||||
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo);
|
||||
|
||||
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
|
||||
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
|
||||
|
||||
int32_t topFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t numOfElems = 0;
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
|
||||
|
||||
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
|
||||
// buildTopBotStruct(pRes, pCtx);
|
||||
// }
|
||||
|
@ -1381,7 +1412,7 @@ int32_t topFunction(SqlFunctionCtx *pCtx) {
|
|||
numOfElems++;
|
||||
|
||||
char* data = colDataGetData(pCol, i);
|
||||
doAddIntoResult(pRes, pCtx->param[1].param.i, data, i, NULL, type, pInput->uid, pResInfo);
|
||||
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1414,9 +1445,11 @@ static int32_t topBotResComparFn(const void *p1, const void *p2, const void *par
|
|||
return (val1->v.d > val2->v.d) ? 1 : -1;
|
||||
}
|
||||
|
||||
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
|
||||
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
|
||||
int32_t maxSize = pCtx->param[1].param.i;
|
||||
|
||||
void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
|
||||
SVariant val = {0};
|
||||
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
|
||||
|
||||
|
@ -1428,22 +1461,9 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t row
|
|||
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
|
||||
pItem->v = val;
|
||||
pItem->uid = uid;
|
||||
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
|
||||
|
||||
if (pRes->pageId == -1) {
|
||||
SFilePage* pPage = getNewBufPage(NULL, 0, &pRes->pageId);
|
||||
pPage->num = sizeof(SFilePage);
|
||||
|
||||
// keep the current row data
|
||||
for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
bool isNull = colDataIsNull_s(pCol, rowIndex);
|
||||
|
||||
|
||||
colDataGetData(pCol, rowIndex);
|
||||
}
|
||||
|
||||
}
|
||||
// save the data of this tuple
|
||||
saveTupleData(pCtx, rowIndex, pSrcBlock, pItem);
|
||||
|
||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||
pEntryInfo->numOfRes++;
|
||||
|
@ -1452,22 +1472,100 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t row
|
|||
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
|
||||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
|
||||
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
|
||||
// replace the old data and the coresponding tuple data
|
||||
STopBotResItem* pItem = &pItems[0];
|
||||
pItem->v = val;
|
||||
pItem->uid = uid;
|
||||
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
|
||||
|
||||
// save the data of this tuple by over writing the old data
|
||||
copyTupleData(pCtx, rowIndex, pSrcBlock, pItem);
|
||||
|
||||
taosheapadjust((void *) pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void *) &type, topBotResComparFn, NULL, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
||||
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
|
||||
SFilePage* pPage = NULL;
|
||||
|
||||
int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool);
|
||||
|
||||
if (pCtx->curBufPage == -1) {
|
||||
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
||||
pPage->num = sizeof(SFilePage);
|
||||
} else {
|
||||
pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage);
|
||||
if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) {
|
||||
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
||||
pPage->num = sizeof(SFilePage);
|
||||
}
|
||||
}
|
||||
|
||||
pItem->tuplePos.pageId = pCtx->curBufPage;
|
||||
|
||||
// keep the current row data, extract method
|
||||
int32_t offset = 0;
|
||||
bool* nullList = (bool*)((char*)pPage + pPage->num);
|
||||
char* pStart = (char*)(nullList + sizeof(bool) * pSrcBlock->info.numOfCols);
|
||||
for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
bool isNull = colDataIsNull_s(pCol, rowIndex);
|
||||
if (isNull) {
|
||||
nullList[i] = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
char* p = colDataGetData(pCol, rowIndex);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
memcpy(pStart + offset, p, varDataTLen(p));
|
||||
} else {
|
||||
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||
}
|
||||
|
||||
offset += pCol->info.bytes;
|
||||
}
|
||||
|
||||
pItem->tuplePos.offset = pPage->num;
|
||||
pPage->num += completeRowSize;
|
||||
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
}
|
||||
|
||||
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
|
||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId);
|
||||
|
||||
bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset);
|
||||
char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));
|
||||
|
||||
int32_t offset = 0;
|
||||
for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* p = colDataGetData(pCol, rowIndex);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
memcpy(pStart + offset, p, varDataTLen(p));
|
||||
} else {
|
||||
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||
}
|
||||
|
||||
offset += pCol->info.bytes;
|
||||
}
|
||||
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
}
|
||||
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo *pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
pEntryInfo->complete = true;
|
||||
|
||||
int32_t type = pCtx->input.pData[0]->info.type;
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
// todo assign the tag value and the corresponding row data
|
||||
|
@ -1476,19 +1574,45 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId
|
|||
case TSDB_DATA_TYPE_INT: {
|
||||
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
|
||||
STopBotResItem* pItem = &pRes->pItems[i];
|
||||
colDataAppendInt32(pCol, currentRow++, (int32_t*)&pItem->v.i);
|
||||
colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i);
|
||||
|
||||
int32_t pageId = pItem->tuplePos.pageId;
|
||||
int32_t offset = pItem->tuplePos.offset;
|
||||
if (pageId != -1) {
|
||||
// todo
|
||||
if (pItem->tuplePos.pageId != -1) {
|
||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
|
||||
|
||||
bool* nullList = (bool*)((char*)pPage + offset);
|
||||
char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));
|
||||
|
||||
// todo set the offset value to optimize the performance.
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
|
||||
SFunctParam *pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
|
||||
int32_t ps = 0;
|
||||
for(int32_t k = 0; k < srcSlotId; ++k) {
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
|
||||
ps += pSrcCol->info.bytes;
|
||||
}
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
if (nullList[srcSlotId]) {
|
||||
colDataAppendNULL(pDstCol, currentRow);
|
||||
} else {
|
||||
colDataAppend(pDstCol, currentRow, (pStart + ps), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
currentRow += 1;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return pEntryInfo->numOfRes;
|
||||
|
||||
// return functionFinalize(pCtx, pBlock, slotId);
|
||||
}
|
||||
|
|
|
@ -3673,7 +3673,9 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot);
|
||||
if (nodeType(pQuery->pRoot) == QUERY_NODE_SELECT_STMT) {
|
||||
pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL != pCxt->pDbs) {
|
||||
|
|
|
@ -476,6 +476,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t
|
|||
return (void*)buf;
|
||||
}
|
||||
|
||||
// todo remove it
|
||||
// order array<type *>
|
||||
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
|
||||
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
|
||||
|
|
|
@ -115,19 +115,19 @@ endi
|
|||
if $data00 != 10 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 10 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data90 != 10 then
|
||||
return -1
|
||||
endi
|
||||
if $data91 != 9 then
|
||||
if $data01 != 7 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 6 then
|
||||
return -1
|
||||
endi
|
||||
if $data91 != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -143,16 +143,16 @@ if $row != 10 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @22-01-01 00:00:00.000@ then
|
||||
if $data00 != @22-01-01 00:00:00.007@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 0 then
|
||||
if $data01 != 7 then
|
||||
return -1
|
||||
endi
|
||||
if $data90 != @22-01-01 00:00:00.009@ then
|
||||
if $data90 != @22-01-01 00:00:00.003@ then
|
||||
return -1
|
||||
endi
|
||||
if $data91 != 9 then
|
||||
if $data91 != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ endi
|
|||
sql select count(tbcol) from $mt
|
||||
print select count(tbcol) from $mt ===> $data00
|
||||
if $data00 != $totalNum then
|
||||
print expect $totalNum , actual: $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue