Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format
This commit is contained in:
commit
cb3185a9cc
|
@ -51,6 +51,7 @@ extern int32_t tsVnodeShmSize;
|
||||||
extern int32_t tsQnodeShmSize;
|
extern int32_t tsQnodeShmSize;
|
||||||
extern int32_t tsSnodeShmSize;
|
extern int32_t tsSnodeShmSize;
|
||||||
extern int32_t tsBnodeShmSize;
|
extern int32_t tsBnodeShmSize;
|
||||||
|
extern int32_t tsNumOfShmThreads;
|
||||||
|
|
||||||
// queue & threads
|
// queue & threads
|
||||||
extern int32_t tsNumOfRpcThreads;
|
extern int32_t tsNumOfRpcThreads;
|
||||||
|
@ -67,6 +68,7 @@ extern int32_t tsNumOfQnodeQueryThreads;
|
||||||
extern int32_t tsNumOfQnodeFetchThreads;
|
extern int32_t tsNumOfQnodeFetchThreads;
|
||||||
extern int32_t tsNumOfSnodeSharedThreads;
|
extern int32_t tsNumOfSnodeSharedThreads;
|
||||||
extern int32_t tsNumOfSnodeUniqueThreads;
|
extern int32_t tsNumOfSnodeUniqueThreads;
|
||||||
|
extern int64_t tsRpcQueueMemoryAllowed;
|
||||||
|
|
||||||
// monitor
|
// monitor
|
||||||
extern bool tsEnableMonitor;
|
extern bool tsEnableMonitor;
|
||||||
|
|
|
@ -345,6 +345,7 @@ bool nodesIsUnaryOp(const SOperatorNode* pOp);
|
||||||
bool nodesIsArithmeticOp(const SOperatorNode* pOp);
|
bool nodesIsArithmeticOp(const SOperatorNode* pOp);
|
||||||
bool nodesIsComparisonOp(const SOperatorNode* pOp);
|
bool nodesIsComparisonOp(const SOperatorNode* pOp);
|
||||||
bool nodesIsJsonOp(const SOperatorNode* pOp);
|
bool nodesIsJsonOp(const SOperatorNode* pOp);
|
||||||
|
bool nodesIsRegularOp(const SOperatorNode* pOp);
|
||||||
|
|
||||||
bool nodesIsTimeorderQuery(const SNode* pQuery);
|
bool nodesIsTimeorderQuery(const SNode* pQuery);
|
||||||
bool nodesIsTimelineQuery(const SNode* pQuery);
|
bool nodesIsTimelineQuery(const SNode* pQuery);
|
||||||
|
|
|
@ -37,6 +37,7 @@ typedef int32_t TdUcs4;
|
||||||
#define wcstombs WCSTOMBS_FUNC_TAOS_FORBID
|
#define wcstombs WCSTOMBS_FUNC_TAOS_FORBID
|
||||||
#define wcsncpy WCSNCPY_FUNC_TAOS_FORBID
|
#define wcsncpy WCSNCPY_FUNC_TAOS_FORBID
|
||||||
#define wchar_t WCHAR_T_TYPE_TAOS_FORBID
|
#define wchar_t WCHAR_T_TYPE_TAOS_FORBID
|
||||||
|
#define strcasestr STR_CASE_STR_FORBID
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
|
@ -69,6 +70,8 @@ int32_t taosMbsToWchars(TdWchar *pWchars, const char *pStrs, int32_t size);
|
||||||
int32_t taosWcharToMb(char *pStr, TdWchar wchar);
|
int32_t taosWcharToMb(char *pStr, TdWchar wchar);
|
||||||
int32_t taosWcharsToMbs(char *pStrs, TdWchar *pWchars, int32_t size);
|
int32_t taosWcharsToMbs(char *pStrs, TdWchar *pWchars, int32_t size);
|
||||||
|
|
||||||
|
char *taosStrCaseStr(const char *str, const char *pattern);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -89,6 +89,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
|
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
|
||||||
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
|
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
|
||||||
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0117)
|
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0117)
|
||||||
|
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0118)
|
||||||
|
|
||||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
|
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
|
||||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
|
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_UTIL_PROCESS_H_
|
#define _TD_UTIL_PROCESS_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "tqueue.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -25,7 +26,7 @@ extern "C" {
|
||||||
typedef enum { PROC_FUNC_REQ = 1, PROC_FUNC_RSP, PROC_FUNC_REGIST, PROC_FUNC_RELEASE } EProcFuncType;
|
typedef enum { PROC_FUNC_REQ = 1, PROC_FUNC_RSP, PROC_FUNC_REGIST, PROC_FUNC_RELEASE } EProcFuncType;
|
||||||
|
|
||||||
typedef struct SProcObj SProcObj;
|
typedef struct SProcObj SProcObj;
|
||||||
typedef void *(*ProcMallocFp)(int32_t contLen);
|
typedef void *(*ProcMallocFp)(int32_t contLen, EQItype itype);
|
||||||
typedef void *(*ProcFreeFp)(void *pCont);
|
typedef void *(*ProcFreeFp)(void *pCont);
|
||||||
typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
||||||
EProcFuncType ftype);
|
EProcFuncType ftype);
|
||||||
|
|
|
@ -48,18 +48,24 @@ typedef struct {
|
||||||
int32_t threadNum;
|
int32_t threadNum;
|
||||||
} SQueueInfo;
|
} SQueueInfo;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
DEF_QITEM = 0,
|
||||||
|
RPC_QITEM = 1,
|
||||||
|
} EQItype;
|
||||||
|
|
||||||
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);
|
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);
|
||||||
typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
|
typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
|
||||||
|
|
||||||
STaosQueue *taosOpenQueue();
|
STaosQueue *taosOpenQueue();
|
||||||
void taosCloseQueue(STaosQueue *queue);
|
void taosCloseQueue(STaosQueue *queue);
|
||||||
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
|
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
|
||||||
void *taosAllocateQitem(int32_t size);
|
void *taosAllocateQitem(int32_t size, EQItype itype);
|
||||||
void taosFreeQitem(void *pItem);
|
void taosFreeQitem(void *pItem);
|
||||||
void taosWriteQitem(STaosQueue *queue, void *pItem);
|
void taosWriteQitem(STaosQueue *queue, void *pItem);
|
||||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||||
bool taosQueueEmpty(STaosQueue *queue);
|
bool taosQueueEmpty(STaosQueue *queue);
|
||||||
int32_t taosQueueSize(STaosQueue *queue);
|
int32_t taosQueueItemSize(STaosQueue *queue);
|
||||||
|
int64_t taosQueueMemorySize(STaosQueue *queue);
|
||||||
|
|
||||||
STaosQall *taosAllocateQall();
|
STaosQall *taosAllocateQall();
|
||||||
void taosFreeQall(STaosQall *qall);
|
void taosFreeQall(STaosQall *qall);
|
||||||
|
@ -77,8 +83,8 @@ int32_t taosGetQueueNumber(STaosQset *qset);
|
||||||
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp);
|
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp);
|
||||||
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
|
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
|
||||||
void taosResetQsetThread(STaosQset *qset, void *pItem);
|
void taosResetQsetThread(STaosQset *qset, void *pItem);
|
||||||
int32_t taosGetQueueItemsNumber(STaosQueue *queue);
|
|
||||||
int32_t taosGetQsetItemsNumber(STaosQset *qset);
|
extern int64_t tsRpcQueueMemoryAllowed;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,12 +63,12 @@ cp ${install_files} ${install_dir}
|
||||||
header_files="${top_dir}/include/client/taos.h ${top_dir}/include/util/taoserror.h"
|
header_files="${top_dir}/include/client/taos.h ${top_dir}/include/util/taoserror.h"
|
||||||
cp ${header_files} ${install_dir}/inc
|
cp ${header_files} ${install_dir}/inc
|
||||||
|
|
||||||
bin_files="${compile_dir}/source/dnode/mgmt/taosd ${compile_dir}/tools/shell/taos ${compile_dir}/tests/test/c/create_table ${compile_dir}/tests/test/c/tmq_sim ${script_dir}/remove.sh ${compile_dir}/build/bin/taosBenchmark ${compile_dir}/build/bin/taosdump"
|
bin_files="${compile_dir}/build/bin/taosd ${compile_dir}/build/bin/taos ${compile_dir}/build/bin/create_table ${compile_dir}/build/bin/tmq_sim ${script_dir}/remove.sh ${compile_dir}/build/bin/taosBenchmark ${compile_dir}/build/bin/taosdump"
|
||||||
cp -rf ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
|
cp -rf ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
|
||||||
|
|
||||||
cp -rf ${compile_dir}/source/client/libtaos.so ${install_dir}/lib/
|
cp ${compile_dir}/build/lib/libtaos.so ${install_dir}/lib/
|
||||||
cp -rf ${compile_dir}/source/libs/tdb/libtdb.so ${install_dir}/lib/
|
cp ${compile_dir}/build/lib/libtdb.so ${install_dir}/lib/
|
||||||
cp -rf ${compile_dir}/build/lib/libavro* ${install_dir}/lib/ > /dev/null || echo -e "failed to copy avro libraries"
|
cp ${compile_dir}/build/lib/libavro* ${install_dir}/lib/ > /dev/null || echo -e "failed to copy avro libraries"
|
||||||
cp -rf ${compile_dir}/build/lib/pkgconfig ${install_dir}/lib/ > /dev/null || echo -e "failed to copy pkgconfig directory"
|
cp -rf ${compile_dir}/build/lib/pkgconfig ${install_dir}/lib/ > /dev/null || echo -e "failed to copy pkgconfig directory"
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -439,7 +439,9 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
||||||
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
|
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
|
||||||
|
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) {
|
||||||
SSchemaAction schemaAction = { .action = SCHEMA_ACTION_CREATE_STABLE, .createSTable = {0}};
|
SSchemaAction schemaAction;
|
||||||
|
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
|
||||||
|
memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
|
||||||
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
|
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
|
||||||
schemaAction.createSTable.tags = sTableData->tags;
|
schemaAction.createSTable.tags = sTableData->tags;
|
||||||
schemaAction.createSTable.fields = sTableData->cols;
|
schemaAction.createSTable.fields = sTableData->cols;
|
||||||
|
@ -455,7 +457,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
||||||
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
|
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchemaAction schemaAction = {.alterSTable = {0}};
|
SSchemaAction schemaAction;
|
||||||
|
memset(&schemaAction, 0, sizeof(SSchemaAction));
|
||||||
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
|
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
|
||||||
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &schemaAction, true);
|
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &schemaAction, true);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -316,7 +316,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
|
||||||
|
|
||||||
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
|
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
|
||||||
tmq_t* tmq = (tmq_t*)param;
|
tmq_t* tmq = (tmq_t*)param;
|
||||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||||
*pTaskType = TMQ_DELAYED_TASK__HB;
|
*pTaskType = TMQ_DELAYED_TASK__HB;
|
||||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
|
@ -324,7 +324,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) {
|
||||||
|
|
||||||
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
||||||
tmq_t* tmq = (tmq_t*)param;
|
tmq_t* tmq = (tmq_t*)param;
|
||||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||||
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
|
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
|
||||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
|
@ -332,7 +332,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
||||||
|
|
||||||
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
|
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
|
||||||
tmq_t* tmq = (tmq_t*)param;
|
tmq_t* tmq = (tmq_t*)param;
|
||||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||||
*pTaskType = TMQ_DELAYED_TASK__REPORT;
|
*pTaskType = TMQ_DELAYED_TASK__REPORT;
|
||||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
|
@ -848,7 +848,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
|
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
|
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
||||||
if (pRspWrapper == NULL) {
|
if (pRspWrapper == NULL) {
|
||||||
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
|
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
|
||||||
goto CREATE_MSG_FAIL;
|
goto CREATE_MSG_FAIL;
|
||||||
|
@ -987,7 +987,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
tmqUpdateEp(tmq, head->epoch, &rsp);
|
tmqUpdateEp(tmq, head->epoch, &rsp);
|
||||||
tDeleteSMqAskEpRsp(&rsp);
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
|
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
|
||||||
if (pWrapper == NULL) {
|
if (pWrapper == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
code = -1;
|
code = -1;
|
||||||
|
|
|
@ -44,6 +44,7 @@ int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128;
|
||||||
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||||
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||||
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||||
|
int32_t tsNumOfShmThreads = 1;
|
||||||
|
|
||||||
// queue & threads
|
// queue & threads
|
||||||
int32_t tsNumOfRpcThreads = 1;
|
int32_t tsNumOfRpcThreads = 1;
|
||||||
|
@ -375,6 +376,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
|
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
|
||||||
|
@ -428,6 +430,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
|
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||||
|
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L);
|
||||||
|
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, 1, INT64_MAX, 0) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
|
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
|
||||||
|
@ -568,6 +574,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||||
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||||
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||||
|
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
|
||||||
|
|
||||||
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||||
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
||||||
|
|
|
@ -402,7 +402,8 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = {
|
||||||
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt, getStatics_u32},
|
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt, getStatics_u32},
|
||||||
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint,
|
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint,
|
||||||
getStatics_u64},
|
getStatics_u64},
|
||||||
{TSDB_DATA_TYPE_JSON, 4, TSDB_MAX_JSON_TAG_LEN, "JSON", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
|
{TSDB_DATA_TYPE_JSON, 4, TSDB_MAX_JSON_TAG_LEN, "JSON", 0, 0, tsCompressString, tsDecompressString,
|
||||||
|
getStatics_nchr},
|
||||||
};
|
};
|
||||||
|
|
||||||
char tTokenTypeSwitcher[13] = {
|
char tTokenTypeSwitcher[13] = {
|
||||||
|
|
|
@ -104,7 +104,7 @@ int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) {
|
static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) {
|
||||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
|
||||||
if (pMsg == NULL) return -1;
|
if (pMsg == NULL) return -1;
|
||||||
|
|
||||||
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
|
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
|
||||||
|
|
|
@ -102,7 +102,7 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
|
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
|
||||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -126,10 +126,10 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
|
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case QUERY_QUEUE:
|
case QUERY_QUEUE:
|
||||||
size = taosQueueSize(pMgmt->queryWorker.queue);
|
size = taosQueueItemSize(pMgmt->queryWorker.queue);
|
||||||
break;
|
break;
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
size = taosQueueSize(pMgmt->fetchWorker.queue);
|
size = taosQueueItemSize(pMgmt->fetchWorker.queue);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -326,7 +326,7 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL) return -1;
|
if (pVnode == NULL) return -1;
|
||||||
|
|
||||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pMsg != NULL) {
|
if (pMsg != NULL) {
|
||||||
|
@ -397,22 +397,22 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case WRITE_QUEUE:
|
case WRITE_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pWriteQ);
|
size = taosQueueItemSize(pVnode->pWriteQ);
|
||||||
break;
|
break;
|
||||||
case SYNC_QUEUE:
|
case SYNC_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pSyncQ);
|
size = taosQueueItemSize(pVnode->pSyncQ);
|
||||||
break;
|
break;
|
||||||
case APPLY_QUEUE:
|
case APPLY_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pApplyQ);
|
size = taosQueueItemSize(pVnode->pApplyQ);
|
||||||
break;
|
break;
|
||||||
case QUERY_QUEUE:
|
case QUERY_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pQueryQ);
|
size = taosQueueItemSize(pVnode->pQueryQ);
|
||||||
break;
|
break;
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pFetchQ);
|
size = taosQueueItemSize(pVnode->pFetchQ);
|
||||||
break;
|
break;
|
||||||
case MERGE_QUEUE:
|
case MERGE_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pMergeQ);
|
size = taosQueueItemSize(pVnode->pMergeQ);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -79,7 +79,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
|
||||||
needRelease = true;
|
needRelease = true;
|
||||||
|
|
||||||
if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
|
if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
|
||||||
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
|
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM)) == NULL) goto _OVER;
|
||||||
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
|
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
|
||||||
|
|
||||||
if (pWrapper->procType != DND_PROC_PARENT) {
|
if (pWrapper->procType != DND_PROC_PARENT) {
|
||||||
|
|
|
@ -66,11 +66,7 @@ static void mndPullupTrans(SMnode *pMnode) {
|
||||||
static void mndCalMqRebalance(SMnode *pMnode) {
|
static void mndCalMqRebalance(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
.msgType = TDMT_MND_MQ_TIMER,
|
|
||||||
.pCont = pReq,
|
|
||||||
.contLen = contLen,
|
|
||||||
};
|
|
||||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,8 +96,10 @@ typedef struct {
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
int16_t cid;
|
int32_t cid;
|
||||||
char data[];
|
uint8_t isNull : 1;
|
||||||
|
uint8_t type : 7;
|
||||||
|
uint8_t data[]; // val + uid
|
||||||
} STagIdxKey;
|
} STagIdxKey;
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
|
|
|
@ -227,8 +227,7 @@ static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
|
||||||
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||||
STagIdxKey *pTagIdxKey1 = (STagIdxKey *)pKey1;
|
STagIdxKey *pTagIdxKey1 = (STagIdxKey *)pKey1;
|
||||||
STagIdxKey *pTagIdxKey2 = (STagIdxKey *)pKey2;
|
STagIdxKey *pTagIdxKey2 = (STagIdxKey *)pKey2;
|
||||||
int8_t *p1, *p2;
|
tb_uid_t uid1, uid2;
|
||||||
int8_t type;
|
|
||||||
int c;
|
int c;
|
||||||
|
|
||||||
// compare suid
|
// compare suid
|
||||||
|
@ -245,31 +244,34 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// compare value
|
ASSERT(pTagIdxKey1->type == pTagIdxKey2->type);
|
||||||
p1 = pTagIdxKey1->data;
|
|
||||||
p2 = pTagIdxKey2->data;
|
|
||||||
ASSERT(p1[0] == p2[0]);
|
|
||||||
type = p1[0];
|
|
||||||
|
|
||||||
p1++;
|
// check NULL, NULL is always the smallest
|
||||||
p2++;
|
if (pTagIdxKey1->isNull && !pTagIdxKey2->isNull) {
|
||||||
|
return -1;
|
||||||
|
} else if (!pTagIdxKey1->isNull && pTagIdxKey2->isNull) {
|
||||||
|
return 1;
|
||||||
|
} else if (!pTagIdxKey1->isNull && !pTagIdxKey2->isNull) {
|
||||||
|
// all not NULL, compr tag vals
|
||||||
|
c = doCompare(pTagIdxKey1->data, pTagIdxKey2->data, pTagIdxKey1->type, 0);
|
||||||
|
if (c) return c;
|
||||||
|
|
||||||
c = doCompare(p1, p2, type, 0);
|
if (IS_VAR_DATA_TYPE(pTagIdxKey1->type)) {
|
||||||
if (c) return c;
|
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + varDataTLen(pTagIdxKey1->data));
|
||||||
|
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + varDataTLen(pTagIdxKey2->data));
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
} else {
|
||||||
p1 = p1 + varDataTLen(p1);
|
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + tDataTypes[pTagIdxKey1->type].bytes);
|
||||||
p2 = p2 + varDataTLen(p2);
|
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + tDataTypes[pTagIdxKey2->type].bytes);
|
||||||
} else {
|
}
|
||||||
p1 = p1 + tDataTypes[type].bytes;
|
|
||||||
p2 = p2 + tDataTypes[type].bytes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// compare suid
|
// compare uid
|
||||||
if (*(tb_uid_t *)p1 > *(tb_uid_t *)p2) {
|
if (uid1 < uid2) {
|
||||||
return 1;
|
|
||||||
} else if (*(tb_uid_t *)p1 < *(tb_uid_t *)p2) {
|
|
||||||
return -1;
|
return -1;
|
||||||
|
} else if (uid1 > uid2) {
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -22,7 +22,7 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
|
||||||
|
|
||||||
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
|
@ -389,8 +389,73 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
return tdbDbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
|
return tdbDbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
|
||||||
// TODO
|
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
|
||||||
|
int32_t nTagData = 0;
|
||||||
|
|
||||||
|
if (pTagData) {
|
||||||
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
|
nTagData = varDataTLen(pTagData);
|
||||||
|
} else {
|
||||||
|
nTagData = tDataTypes[type].bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*nTagIdxKey = sizeof(STagIdxKey) + nTagData + sizeof(tb_uid_t);
|
||||||
|
|
||||||
|
*ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey);
|
||||||
|
if (*ppTagIdxKey == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*ppTagIdxKey)->suid = suid;
|
||||||
|
(*ppTagIdxKey)->cid = cid;
|
||||||
|
(*ppTagIdxKey)->isNull = (pTagData == NULL) ? 1 : 0;
|
||||||
|
(*ppTagIdxKey)->type = type;
|
||||||
|
if (nTagData) memcpy((*ppTagIdxKey)->data, pTagData, nTagData);
|
||||||
|
*(tb_uid_t *)((*ppTagIdxKey)->data + nTagData) = uid;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
|
||||||
|
if (pTagIdxKey) taosMemoryFree(pTagIdxKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
||||||
|
void *pData = NULL;
|
||||||
|
int nData = 0;
|
||||||
|
STbDbKey tbDbKey = {0};
|
||||||
|
SMetaEntry stbEntry = {0};
|
||||||
|
STagIdxKey *pTagIdxKey = NULL;
|
||||||
|
int32_t nTagIdxKey;
|
||||||
|
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
|
||||||
|
const void *pTagData = NULL; //
|
||||||
|
SDecoder dc = {0};
|
||||||
|
|
||||||
|
// get super table
|
||||||
|
tdbDbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData);
|
||||||
|
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
|
||||||
|
tbDbKey.version = *(int64_t *)pData;
|
||||||
|
tdbDbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
|
||||||
|
|
||||||
|
tDecoderInit(&dc, pData, nData);
|
||||||
|
metaDecodeEntry(&dc, &stbEntry);
|
||||||
|
|
||||||
|
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
|
||||||
|
pTagData = tdGetKVRowValOfCol((const SKVRow)pCtbEntry->ctbEntry.pTags, pTagColumn->colId);
|
||||||
|
|
||||||
|
// update tag index
|
||||||
|
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, pTagColumn->type, pCtbEntry->uid,
|
||||||
|
&pTagIdxKey, &nTagIdxKey) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tdbDbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
|
||||||
|
metaDestroyTagIdxKey(pTagIdxKey);
|
||||||
|
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
tdbFree(pData);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,9 +88,9 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
|
||||||
|
|
||||||
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||||
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
|
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
|
||||||
if (taosQueueSize(pDispatcher->pDataBlocks) > capacity) {
|
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
|
||||||
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
|
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
|
||||||
taosQueueSize(pDispatcher->pDataBlocks));
|
taosQueueItemSize(pDispatcher->pDataBlocks));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
|
||||||
|
|
||||||
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
||||||
taosThreadMutexLock(&pDispatcher->mutex);
|
taosThreadMutexLock(&pDispatcher->mutex);
|
||||||
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
|
int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
|
||||||
int32_t status =
|
int32_t status =
|
||||||
(0 == blockNums ? DS_BUF_EMPTY
|
(0 == blockNums ? DS_BUF_EMPTY
|
||||||
: (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
: (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
||||||
|
@ -124,7 +124,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
|
||||||
|
|
||||||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
|
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
|
||||||
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
|
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,11 +200,37 @@ TEST(testCase, index_filter) {
|
||||||
doFilterTag(opNode, result);
|
doFilterTag(opNode, result);
|
||||||
EXPECT_EQ(1, taosArrayGetSize(result));
|
EXPECT_EQ(1, taosArrayGetSize(result));
|
||||||
|
|
||||||
|
taosArrayDestroy(result);
|
||||||
|
nodesDestroyNode(res);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
|
||||||
|
sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
|
||||||
|
sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
|
||||||
|
sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_INT, pLeft, pRight);
|
||||||
|
SArray *result = taosArrayInit(4, sizeof(uint64_t));
|
||||||
|
doFilterTag(opNode, result);
|
||||||
|
EXPECT_EQ(0, taosArrayGetSize(result));
|
||||||
|
taosArrayDestroy(result);
|
||||||
|
nodesDestroyNode(res);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
|
||||||
|
sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
|
||||||
|
sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
|
||||||
|
sifMakeOpNode(&opNode, OP_TYPE_GREATER_EQUAL, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
|
||||||
|
|
||||||
|
SArray *result = taosArrayInit(4, sizeof(uint64_t));
|
||||||
|
doFilterTag(opNode, result);
|
||||||
|
EXPECT_EQ(0, taosArrayGetSize(result));
|
||||||
|
|
||||||
taosArrayDestroy(result);
|
taosArrayDestroy(result);
|
||||||
nodesDestroyNode(res);
|
nodesDestroyNode(res);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add other greater/lower/equal/in compare func test
|
||||||
|
|
||||||
TEST(testCase, index_filter_varify) {
|
TEST(testCase, index_filter_varify) {
|
||||||
{
|
{
|
||||||
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
|
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
|
||||||
|
|
|
@ -27,10 +27,33 @@
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define INDEX_NUM_OF_THREADS 4
|
#define INDEX_NUM_OF_THREADS 4
|
||||||
#define INDEX_QUEUE_SIZE 200
|
#define INDEX_QUEUE_SIZE 200
|
||||||
|
|
||||||
void* indexQhandle = NULL;
|
void* indexQhandle = NULL;
|
||||||
|
|
||||||
|
#define INDEX_DATA_BOOL_NULL 0x02
|
||||||
|
#define INDEX_DATA_TINYINT_NULL 0x80
|
||||||
|
#define INDEX_DATA_SMALLINT_NULL 0x8000
|
||||||
|
#define INDEX_DATA_INT_NULL 0x80000000L
|
||||||
|
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L
|
||||||
|
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
|
||||||
|
|
||||||
|
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
|
||||||
|
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
|
||||||
|
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
|
||||||
|
#define INDEX_DATA_BINARY_NULL 0xFF
|
||||||
|
#define INDEX_DATA_JSON_NULL 0xFFFFFFFF
|
||||||
|
#define INDEX_DATA_JSON_null 0xFFFFFFFE
|
||||||
|
#define INDEX_DATA_JSON_NOT_NULL 0x01
|
||||||
|
|
||||||
|
#define INDEX_DATA_UTINYINT_NULL 0xFF
|
||||||
|
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
|
||||||
|
#define INDEX_DATA_UINT_NULL 0xFFFFFFFF
|
||||||
|
#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
|
||||||
|
|
||||||
|
#define INDEX_DATA_NULL_STR "NULL"
|
||||||
|
#define INDEX_DATA_NULL_STR_L "null"
|
||||||
|
|
||||||
void indexInit() {
|
void indexInit() {
|
||||||
// refactor later
|
// refactor later
|
||||||
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
|
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
|
||||||
|
@ -67,12 +90,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef USE_LUCENE
|
|
||||||
index_t* index = index_open(path);
|
|
||||||
sIdx->index = index;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
// sIdx->cache = (void*)indexCacheCreate(sIdx);
|
// sIdx->cache = (void*)indexCacheCreate(sIdx);
|
||||||
sIdx->tindex = indexTFileCreate(path);
|
sIdx->tindex = indexTFileCreate(path);
|
||||||
if (sIdx->tindex == NULL) {
|
if (sIdx->tindex == NULL) {
|
||||||
|
@ -85,7 +102,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||||
taosThreadMutexInit(&sIdx->mtx, NULL);
|
taosThreadMutexInit(&sIdx->mtx, NULL);
|
||||||
*index = sIdx;
|
*index = sIdx;
|
||||||
return 0;
|
return 0;
|
||||||
#endif
|
|
||||||
|
|
||||||
END:
|
END:
|
||||||
if (sIdx != NULL) {
|
if (sIdx != NULL) {
|
||||||
|
@ -97,12 +113,6 @@ END:
|
||||||
}
|
}
|
||||||
|
|
||||||
void indexClose(SIndex* sIdx) {
|
void indexClose(SIndex* sIdx) {
|
||||||
#ifdef USE_LUCENE
|
|
||||||
index_close(sIdex->index);
|
|
||||||
sIdx->index = NULL;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||||
while (iter) {
|
while (iter) {
|
||||||
IndexCache** pCache = iter;
|
IndexCache** pCache = iter;
|
||||||
|
@ -114,31 +124,12 @@ void indexClose(SIndex* sIdx) {
|
||||||
taosHashCleanup(sIdx->colObj);
|
taosHashCleanup(sIdx->colObj);
|
||||||
taosThreadMutexDestroy(&sIdx->mtx);
|
taosThreadMutexDestroy(&sIdx->mtx);
|
||||||
indexTFileDestroy(sIdx->tindex);
|
indexTFileDestroy(sIdx->tindex);
|
||||||
#endif
|
|
||||||
taosMemoryFree(sIdx->path);
|
taosMemoryFree(sIdx->path);
|
||||||
taosMemoryFree(sIdx);
|
taosMemoryFree(sIdx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
#ifdef USE_LUCENE
|
|
||||||
index_document_t* doc = index_document_create();
|
|
||||||
|
|
||||||
char buf[16] = {0};
|
|
||||||
sprintf(buf, "%d", uid);
|
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
|
||||||
index_document_add(doc, (const char*)(p->key), p->nKey, (const char*)(p->val), p->nVal, 1);
|
|
||||||
}
|
|
||||||
index_document_add(doc, NULL, 0, buf, strlen(buf), 0);
|
|
||||||
|
|
||||||
index_put(index->index, doc);
|
|
||||||
index_document_destroy(doc);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
|
|
||||||
// TODO(yihao): reduce the lock range
|
// TODO(yihao): reduce the lock range
|
||||||
taosThreadMutexLock(&index->mtx);
|
taosThreadMutexLock(&index->mtx);
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
|
@ -170,12 +161,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
|
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
|
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
|
||||||
|
|
||||||
SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
|
SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -188,35 +176,14 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
|
||||||
}
|
}
|
||||||
indexMergeFinalResults(iRslts, opera, result);
|
indexMergeFinalResults(iRslts, opera, result);
|
||||||
indexInterResultsDestroy(iRslts);
|
indexInterResultsDestroy(iRslts);
|
||||||
|
|
||||||
#endif
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
|
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
|
||||||
#ifdef USE_INVERTED_INDEX
|
int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
|
||||||
|
|
||||||
#endif
|
SIndexOpts* indexOptsCreate() { return NULL; }
|
||||||
|
void indexOptsDestroy(SIndexOpts* opts) { return; }
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
int indexRebuild(SIndex* index, SIndexOpts* opts) {
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SIndexOpts* indexOptsCreate() {
|
|
||||||
#ifdef USE_LUCENE
|
|
||||||
#endif
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
void indexOptsDestroy(SIndexOpts* opts) {
|
|
||||||
#ifdef USE_LUCENE
|
|
||||||
#endif
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
/*
|
/*
|
||||||
* @param: oper
|
* @param: oper
|
||||||
*
|
*
|
||||||
|
|
|
@ -403,6 +403,19 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
|
||||||
EXPECT_EQ(1000, taosArrayGetSize(result));
|
EXPECT_EQ(1000, taosArrayGetSize(result));
|
||||||
indexMultiTermQueryDestroy(mq);
|
indexMultiTermQueryDestroy(mq);
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
std::string colName("other_column");
|
||||||
|
std::string colVal("100");
|
||||||
|
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
indexMultiTermAdd(terms, term);
|
||||||
|
for (size_t i = 0; i < 1000; i++) {
|
||||||
|
tIndexJsonPut(index, terms, i);
|
||||||
|
}
|
||||||
|
indexMultiTermDestroy(terms);
|
||||||
|
}
|
||||||
{
|
{
|
||||||
std::string colName("test1");
|
std::string colName("test1");
|
||||||
std::string colVal("10");
|
std::string colVal("10");
|
||||||
|
|
|
@ -1112,6 +1112,19 @@ bool nodesIsJsonOp(const SOperatorNode* pOp) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool nodesIsRegularOp(const SOperatorNode* pOp) {
|
||||||
|
switch (pOp->opType) {
|
||||||
|
case OP_TYPE_LIKE:
|
||||||
|
case OP_TYPE_NOT_LIKE:
|
||||||
|
case OP_TYPE_MATCH:
|
||||||
|
case OP_TYPE_NMATCH:
|
||||||
|
return true;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
bool nodesIsTimeorderQuery(const SNode* pQuery) { return false; }
|
bool nodesIsTimeorderQuery(const SNode* pQuery) { return false; }
|
||||||
|
|
||||||
bool nodesIsTimelineQuery(const SNode* pQuery) { return false; }
|
bool nodesIsTimelineQuery(const SNode* pQuery) { return false; }
|
||||||
|
|
|
@ -868,9 +868,9 @@ query_expression_body(A) ::=
|
||||||
query_expression_body(B) UNION query_expression_body(D). { A = createSetOperator(pCxt, SET_OP_TYPE_UNION, B, D); }
|
query_expression_body(B) UNION query_expression_body(D). { A = createSetOperator(pCxt, SET_OP_TYPE_UNION, B, D); }
|
||||||
|
|
||||||
query_primary(A) ::= query_specification(B). { A = B; }
|
query_primary(A) ::= query_specification(B). { A = B; }
|
||||||
//query_primary(A) ::=
|
query_primary(A) ::=
|
||||||
// NK_LP query_expression_body(B)
|
NK_LP query_expression_body(B)
|
||||||
// order_by_clause_opt slimit_clause_opt limit_clause_opt NK_RP. { A = B; }
|
order_by_clause_opt slimit_clause_opt limit_clause_opt NK_RP. { A = B; }
|
||||||
|
|
||||||
%type order_by_clause_opt { SNodeList* }
|
%type order_by_clause_opt { SNodeList* }
|
||||||
%destructor order_by_clause_opt { nodesDestroyList($$); }
|
%destructor order_by_clause_opt { nodesDestroyList($$); }
|
||||||
|
|
|
@ -480,8 +480,8 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SDataType targetDt) {
|
||||||
uint8_t precision = (NULL != pCxt->pCurrStmt ? pCxt->pCurrStmt->precision : pVal->node.resType.precision);
|
uint8_t precision = (NULL != pCxt->pCurrStmt ? pCxt->pCurrStmt->precision : targetDt.precision);
|
||||||
pVal->node.resType.precision = precision;
|
pVal->node.resType.precision = precision;
|
||||||
if (pVal->placeholderNo > 0) {
|
if (pVal->placeholderNo > 0) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
|
@ -493,7 +493,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
||||||
}
|
}
|
||||||
*(int64_t*)&pVal->typeData = pVal->datum.i;
|
*(int64_t*)&pVal->typeData = pVal->datum.i;
|
||||||
} else {
|
} else {
|
||||||
switch (pVal->node.resType.type) {
|
switch (targetDt.type) {
|
||||||
case TSDB_DATA_TYPE_NULL:
|
case TSDB_DATA_TYPE_NULL:
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
@ -562,35 +562,54 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_VARCHAR:
|
case TSDB_DATA_TYPE_VARCHAR:
|
||||||
case TSDB_DATA_TYPE_VARBINARY: {
|
case TSDB_DATA_TYPE_VARBINARY: {
|
||||||
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
|
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + VARSTR_HEADER_SIZE + 1);
|
||||||
if (NULL == pVal->datum.p) {
|
if (NULL == pVal->datum.p) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
|
varDataSetLen(pVal->datum.p, targetDt.bytes);
|
||||||
strncpy(varDataVal(pVal->datum.p), pVal->literal, pVal->node.resType.bytes);
|
strncpy(varDataVal(pVal->datum.p), pVal->literal, targetDt.bytes);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP: {
|
case TSDB_DATA_TYPE_TIMESTAMP: {
|
||||||
if (taosParseTime(pVal->literal, &pVal->datum.i, pVal->node.resType.bytes, precision, tsDaylight) !=
|
if (taosParseTime(pVal->literal, &pVal->datum.i, targetDt.bytes, precision, tsDaylight) != TSDB_CODE_SUCCESS) {
|
||||||
TSDB_CODE_SUCCESS) {
|
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
|
||||||
}
|
}
|
||||||
*(int64_t*)&pVal->typeData = pVal->datum.i;
|
*(int64_t*)&pVal->typeData = pVal->datum.i;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR: {
|
||||||
|
int32_t bytes = targetDt.bytes * TSDB_NCHAR_SIZE;
|
||||||
|
pVal->datum.p = taosMemoryCalloc(1, bytes + VARSTR_HEADER_SIZE + 1);
|
||||||
|
if (NULL == pVal->datum.p) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t output = 0;
|
||||||
|
if (!taosMbsToUcs4(pVal->literal, pVal->node.resType.bytes, (TdUcs4*)varDataVal(pVal->datum.p), bytes,
|
||||||
|
&output)) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
|
||||||
|
}
|
||||||
|
varDataSetLen(pVal->datum.p, output);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case TSDB_DATA_TYPE_JSON:
|
case TSDB_DATA_TYPE_JSON:
|
||||||
case TSDB_DATA_TYPE_DECIMAL:
|
case TSDB_DATA_TYPE_DECIMAL:
|
||||||
case TSDB_DATA_TYPE_BLOB:
|
case TSDB_DATA_TYPE_BLOB:
|
||||||
// todo
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pVal->node.resType = targetDt;
|
||||||
pVal->translate = true;
|
pVal->translate = true;
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
||||||
|
return translateValueImpl(pCxt, pVal, pVal->node.resType);
|
||||||
|
}
|
||||||
|
|
||||||
static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||||
if (nodesIsUnaryOp(pOp)) {
|
if (nodesIsUnaryOp(pOp)) {
|
||||||
if (OP_TYPE_MINUS == pOp->opType) {
|
if (OP_TYPE_MINUS == pOp->opType) {
|
||||||
|
@ -635,6 +654,14 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||||
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
|
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
|
||||||
((SExprNode*)pOp->pRight)->resType = ((SExprNode*)pOp->pLeft)->resType;
|
((SExprNode*)pOp->pRight)->resType = ((SExprNode*)pOp->pLeft)->resType;
|
||||||
}
|
}
|
||||||
|
if (nodesIsRegularOp(pOp)) {
|
||||||
|
if (!IS_STR_DATA_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
|
||||||
|
}
|
||||||
|
if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || !IS_STR_DATA_TYPE(((SExprNode*)(pOp->pRight))->resType.type)) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||||
|
}
|
||||||
|
}
|
||||||
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||||
} else if (nodesIsJsonOp(pOp)) {
|
} else if (nodesIsJsonOp(pOp)) {
|
||||||
|
@ -3844,7 +3871,7 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS
|
||||||
if (pVal->node.resType.type == TSDB_DATA_TYPE_NULL) {
|
if (pVal->node.resType.type == TSDB_DATA_TYPE_NULL) {
|
||||||
// todo
|
// todo
|
||||||
} else {
|
} else {
|
||||||
tdAddColToKVRow(pBuilder, pSchema->colId, &(pVal->datum.p),
|
tdAddColToKVRow(pBuilder, pSchema->colId, nodesGetValueFromNode(pVal),
|
||||||
IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
|
IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3858,11 +3885,23 @@ static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* p
|
||||||
return scalarCalculateConstants((SNode*)pFunc, (SNode**)pVal);
|
return scalarCalculateConstants((SNode*)pFunc, (SNode**)pVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateTagVal(STranslateContext* pCxt, SNode* pNode, SValueNode** pVal) {
|
static SDataType schemaToDataType(SSchema* pSchema) {
|
||||||
|
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes, .precision = 0, .scale = 0};
|
||||||
|
if (TSDB_DATA_TYPE_VARCHAR == dt.type || TSDB_DATA_TYPE_BINARY == dt.type || TSDB_DATA_TYPE_VARBINARY == dt.type) {
|
||||||
|
dt.bytes -= VARSTR_HEADER_SIZE;
|
||||||
|
} else if (TSDB_DATA_TYPE_NCHAR == dt.type) {
|
||||||
|
dt.bytes = (dt.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||||
|
}
|
||||||
|
return dt;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateTagVal(STranslateContext* pCxt, SSchema* pSchema, SNode* pNode, SValueNode** pVal) {
|
||||||
if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
|
if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
|
||||||
return createValueFromFunction(pCxt, (SFunctionNode*)pNode, pVal);
|
return createValueFromFunction(pCxt, (SFunctionNode*)pNode, pVal);
|
||||||
} else if (QUERY_NODE_VALUE == nodeType(pNode)) {
|
} else if (QUERY_NODE_VALUE == nodeType(pNode)) {
|
||||||
return (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pNode) ? pCxt->errCode : TSDB_CODE_SUCCESS);
|
return (DEAL_RES_ERROR == translateValueImpl(pCxt, (SValueNode*)pNode, schemaToDataType(pSchema))
|
||||||
|
? pCxt->errCode
|
||||||
|
: TSDB_CODE_SUCCESS);
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -3891,7 +3930,7 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, pCol->colName);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, pCol->colName);
|
||||||
}
|
}
|
||||||
SValueNode* pVal = NULL;
|
SValueNode* pVal = NULL;
|
||||||
int32_t code = translateTagVal(pCxt, pNode, &pVal);
|
int32_t code = translateTagVal(pCxt, pSchema, pNode, &pVal);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
if (NULL == pVal) {
|
if (NULL == pVal) {
|
||||||
pVal = (SValueNode*)pNode;
|
pVal = (SValueNode*)pNode;
|
||||||
|
@ -3921,7 +3960,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
FOREACH(pNode, pStmt->pValsOfTags) {
|
FOREACH(pNode, pStmt->pValsOfTags) {
|
||||||
SValueNode* pVal = NULL;
|
SValueNode* pVal = NULL;
|
||||||
int32_t code = translateTagVal(pCxt, pNode, &pVal);
|
int32_t code = translateTagVal(pCxt, pTagSchema + index, pNode, &pVal);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
if (NULL == pVal) {
|
if (NULL == pVal) {
|
||||||
pVal = (SValueNode*)pNode;
|
pVal = (SValueNode*)pNode;
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -232,4 +232,12 @@ TEST_F(ParserSelectTest, semanticError) {
|
||||||
PARSER_STAGE_TRANSLATE);
|
PARSER_STAGE_TRANSLATE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ParserSelectTest, setOperator) {
|
||||||
|
useDb("root", "test");
|
||||||
|
|
||||||
|
run("SELECT * FROM t1 UNION ALL SELECT * FROM t1");
|
||||||
|
|
||||||
|
run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)");
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ParserTest
|
} // namespace ParserTest
|
||||||
|
|
|
@ -723,7 +723,10 @@ static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeL
|
||||||
|
|
||||||
switch (nodeType(pNode)) {
|
switch (nodeType(pNode)) {
|
||||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||||
return nodesListMakeAppend(pScanNodes, pNode);
|
if (TSDB_SUPER_TABLE != ((SScanLogicNode*)pNode)->pMeta->tableType) {
|
||||||
|
return nodesListMakeAppend(pScanNodes, pNode);
|
||||||
|
}
|
||||||
|
break;
|
||||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||||
code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
|
code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -739,6 +742,7 @@ static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeL
|
||||||
|
|
||||||
if (1 != LIST_LENGTH(pNode->pChildren)) {
|
if (1 != LIST_LENGTH(pNode->pChildren)) {
|
||||||
*pNotOptimize = true;
|
*pNotOptimize = true;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
|
return opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
|
||||||
|
|
|
@ -23,12 +23,15 @@ class PlanSubqeuryTest : public PlannerTestBase {};
|
||||||
TEST_F(PlanSubqeuryTest, basic) {
|
TEST_F(PlanSubqeuryTest, basic) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("select * from (select * from t1)");
|
run("SELECT * FROM (SELECT * FROM t1)");
|
||||||
|
|
||||||
|
// run("SELECT LAST(c1) FROM ( SELECT * FROM t1)");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(PlanSubqeuryTest, doubleGroupBy) {
|
TEST_F(PlanSubqeuryTest, doubleGroupBy) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("select count(*) from (select c1 + c3 a, c1 + count(*) b from t1 where c2 = 'abc' group by c1, c3) where a > 100 "
|
run("SELECT COUNT(*) FROM ("
|
||||||
"group by b");
|
"SELECT c1 + c3 a, c1 + COUNT(*) b FROM t1 WHERE c2 = 'abc' GROUP BY c1, c3) "
|
||||||
|
"WHERE a > 100 GROUP BY b");
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
|
||||||
valueNode->datum.i = 0;
|
valueNode->datum.i = 0;
|
||||||
}
|
}
|
||||||
taosMemoryFree(timeStr);
|
taosMemoryFree(timeStr);
|
||||||
|
valueNode->typeData = valueNode->datum.i;
|
||||||
|
|
||||||
valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
|
valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
|
valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
|
||||||
|
|
|
@ -92,7 +92,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
|
||||||
syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg);
|
syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg);
|
||||||
|
|
||||||
SRpcMsg *pTemp;
|
SRpcMsg *pTemp;
|
||||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||||
|
|
||||||
STaosQueue *pMsgQ = queue;
|
STaosQueue *pMsgQ = queue;
|
||||||
|
@ -360,7 +360,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg);
|
syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg);
|
||||||
SSyncIO *io = pParent;
|
SSyncIO *io = pParent;
|
||||||
SRpcMsg *pTemp;
|
SRpcMsg *pTemp;
|
||||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||||
taosWriteQitem(io->pMsgQ, pTemp);
|
taosWriteQitem(io->pMsgQ, pTemp);
|
||||||
}
|
}
|
||||||
|
@ -420,7 +420,7 @@ static void syncIOTickQ(void *param, void *tmrId) {
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncPingReply2RpcMsg(pMsg, &rpcMsg);
|
syncPingReply2RpcMsg(pMsg, &rpcMsg);
|
||||||
SRpcMsg *pTemp;
|
SRpcMsg *pTemp;
|
||||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||||
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
|
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
|
||||||
syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg);
|
syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg);
|
||||||
taosWriteQitem(io->pMsgQ, pTemp);
|
taosWriteQitem(io->pMsgQ, pTemp);
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
# tdb
|
# tdb
|
||||||
add_library(tdb STATIC "")
|
add_library(tdb SHARED "")
|
||||||
target_sources(tdb
|
target_sources(tdb
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"src/db/tdbPCache.c"
|
"src/db/tdbPCache.c"
|
||||||
|
|
|
@ -145,9 +145,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
|
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
|
||||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||||
do { \
|
do { \
|
||||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
||||||
|
@ -223,11 +223,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
|
||||||
#define CONN_RELEASE_BY_SERVER(conn) \
|
#define CONN_RELEASE_BY_SERVER(conn) \
|
||||||
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||||
|
|
||||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||||
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
||||||
|
|
||||||
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
|
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
|
||||||
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
|
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
|
||||||
|
|
||||||
static void* cliWorkThread(void* arg);
|
static void* cliWorkThread(void* arg);
|
||||||
|
|
|
@ -114,7 +114,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char
|
||||||
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SRpcMsg *pTemp;
|
SRpcMsg *pTemp;
|
||||||
|
|
||||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||||
|
|
||||||
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
|
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
|
||||||
|
|
|
@ -103,7 +103,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char
|
||||||
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SRpcMsg *pTemp;
|
SRpcMsg *pTemp;
|
||||||
|
|
||||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||||
|
|
||||||
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
|
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
|
||||||
|
|
|
@ -236,3 +236,22 @@ int32_t taosMbsToWchars(TdWchar *pWchars, const char *pStrs, int32_t size) { ret
|
||||||
int32_t taosWcharToMb(char *pStr, TdWchar wchar) { return wctomb(pStr, wchar); }
|
int32_t taosWcharToMb(char *pStr, TdWchar wchar) { return wctomb(pStr, wchar); }
|
||||||
|
|
||||||
int32_t taosWcharsToMbs(char *pStrs, TdWchar *pWchars, int32_t size) { return wcstombs(pStrs, pWchars, size); }
|
int32_t taosWcharsToMbs(char *pStrs, TdWchar *pWchars, int32_t size) { return wcstombs(pStrs, pWchars, size); }
|
||||||
|
|
||||||
|
char *taosStrCaseStr(const char *str, const char *pattern) {
|
||||||
|
size_t i;
|
||||||
|
|
||||||
|
if (!*pattern)
|
||||||
|
return (char*)str;
|
||||||
|
|
||||||
|
for (; *str; str++) {
|
||||||
|
if (toupper(*str) == toupper(*pattern)) {
|
||||||
|
for (i = 1;; i++) {
|
||||||
|
if (!pattern[i])
|
||||||
|
return (char*)str;
|
||||||
|
if (toupper(str[i]) != toupper(pattern[i]))
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
|
@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
||||||
|
|
|
@ -258,8 +258,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
|
||||||
int16_t headLen = CEIL8(rawHeadLen);
|
int16_t headLen = CEIL8(rawHeadLen);
|
||||||
int32_t bodyLen = CEIL8(rawBodyLen);
|
int32_t bodyLen = CEIL8(rawBodyLen);
|
||||||
|
|
||||||
void *pHead = (*mallocHeadFp)(headLen);
|
void *pHead = (*mallocHeadFp)(headLen, RPC_QITEM);
|
||||||
void *pBody = (*mallocBodyFp)(bodyLen);
|
void *pBody = (*mallocBodyFp)(bodyLen, RPC_QITEM);
|
||||||
if (pHead == NULL || pBody == NULL) {
|
if (pHead == NULL || pBody == NULL) {
|
||||||
taosThreadMutexUnlock(&pQueue->mutex);
|
taosThreadMutexUnlock(&pQueue->mutex);
|
||||||
tsem_post(&pQueue->sem);
|
tsem_post(&pQueue->sem);
|
||||||
|
|
|
@ -18,41 +18,45 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
|
||||||
|
int64_t tsRpcQueueMemoryAllowed = 0;
|
||||||
|
int64_t tsRpcQueueMemoryUsed = 0;
|
||||||
|
|
||||||
typedef struct STaosQnode STaosQnode;
|
typedef struct STaosQnode STaosQnode;
|
||||||
|
|
||||||
typedef struct STaosQnode {
|
typedef struct STaosQnode {
|
||||||
STaosQnode *next;
|
STaosQnode *next;
|
||||||
STaosQueue *queue;
|
STaosQueue *queue;
|
||||||
|
int32_t size;
|
||||||
|
int8_t itype;
|
||||||
|
int8_t reserved[3];
|
||||||
char item[];
|
char item[];
|
||||||
} STaosQnode;
|
} STaosQnode;
|
||||||
|
|
||||||
typedef struct STaosQueue {
|
typedef struct STaosQueue {
|
||||||
int32_t itemSize;
|
STaosQnode *head;
|
||||||
int32_t numOfItems;
|
STaosQnode *tail;
|
||||||
int32_t threadId;
|
STaosQueue *next; // for queue set
|
||||||
STaosQnode *head;
|
STaosQset *qset; // for queue set
|
||||||
STaosQnode *tail;
|
void *ahandle; // for queue set
|
||||||
STaosQueue *next; // for queue set
|
FItem itemFp;
|
||||||
STaosQset *qset; // for queue set
|
FItems itemsFp;
|
||||||
void *ahandle; // for queue set
|
|
||||||
FItem itemFp;
|
|
||||||
FItems itemsFp;
|
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
|
int64_t memOfItems;
|
||||||
|
int32_t numOfItems;
|
||||||
} STaosQueue;
|
} STaosQueue;
|
||||||
|
|
||||||
typedef struct STaosQset {
|
typedef struct STaosQset {
|
||||||
STaosQueue *head;
|
STaosQueue *head;
|
||||||
STaosQueue *current;
|
STaosQueue *current;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
int32_t numOfQueues;
|
tsem_t sem;
|
||||||
int32_t numOfItems;
|
int32_t numOfQueues;
|
||||||
tsem_t sem;
|
int32_t numOfItems;
|
||||||
} STaosQset;
|
} STaosQset;
|
||||||
|
|
||||||
typedef struct STaosQall {
|
typedef struct STaosQall {
|
||||||
STaosQnode *current;
|
STaosQnode *current;
|
||||||
STaosQnode *start;
|
STaosQnode *start;
|
||||||
int32_t itemSize;
|
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
} STaosQall;
|
} STaosQall;
|
||||||
|
|
||||||
|
@ -118,32 +122,61 @@ bool taosQueueEmpty(STaosQueue *queue) {
|
||||||
return empty;
|
return empty;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosQueueSize(STaosQueue *queue) {
|
int32_t taosQueueItemSize(STaosQueue *queue) {
|
||||||
|
if (queue == NULL) return 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&queue->mutex);
|
taosThreadMutexLock(&queue->mutex);
|
||||||
int32_t numOfItems = queue->numOfItems;
|
int32_t numOfItems = queue->numOfItems;
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
return numOfItems;
|
return numOfItems;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosAllocateQitem(int32_t size) {
|
int64_t taosQueueMemorySize(STaosQueue *queue) {
|
||||||
|
if (queue == NULL) return 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&queue->mutex);
|
||||||
|
int64_t memOfItems = queue->memOfItems;
|
||||||
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
return memOfItems;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *taosAllocateQitem(int32_t size, EQItype itype) {
|
||||||
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
|
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
|
||||||
|
pNode->size = size;
|
||||||
|
pNode->itype = itype;
|
||||||
|
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
|
if (itype == RPC_QITEM) {
|
||||||
|
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size);
|
||||||
|
if (alloced > tsRpcQueueMemoryUsed) {
|
||||||
|
taosMemoryFree(pNode);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
|
||||||
|
} else {
|
||||||
|
uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
|
||||||
|
}
|
||||||
|
|
||||||
return (void *)pNode->item;
|
return (void *)pNode->item;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosFreeQitem(void *pItem) {
|
void taosFreeQitem(void *pItem) {
|
||||||
if (pItem == NULL) return;
|
if (pItem == NULL) return;
|
||||||
|
|
||||||
char *temp = pItem;
|
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
|
||||||
temp -= sizeof(STaosQnode);
|
if (pNode->itype > 0) {
|
||||||
uTrace("item:%p, node:%p is freed", pItem, temp);
|
int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size);
|
||||||
taosMemoryFree(temp);
|
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
|
||||||
|
} else {
|
||||||
|
uTrace("item:%p, node:%p is freed", pItem, pNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosWriteQitem(STaosQueue *queue, void *pItem) {
|
void taosWriteQitem(STaosQueue *queue, void *pItem) {
|
||||||
|
@ -161,8 +194,9 @@ void taosWriteQitem(STaosQueue *queue, void *pItem) {
|
||||||
}
|
}
|
||||||
|
|
||||||
queue->numOfItems++;
|
queue->numOfItems++;
|
||||||
|
queue->memOfItems += pNode->size;
|
||||||
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
|
uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
|
||||||
|
@ -181,9 +215,11 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
||||||
queue->head = pNode->next;
|
queue->head = pNode->next;
|
||||||
if (queue->head == NULL) queue->tail = NULL;
|
if (queue->head == NULL) queue->tail = NULL;
|
||||||
queue->numOfItems--;
|
queue->numOfItems--;
|
||||||
|
queue->memOfItems -= pNode->size;
|
||||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
code = 1;
|
code = 1;
|
||||||
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
|
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
|
||||||
|
queue->memOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
@ -191,7 +227,13 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
STaosQall *taosAllocateQall() { return taosMemoryCalloc(1, sizeof(STaosQall)); }
|
STaosQall *taosAllocateQall() {
|
||||||
|
STaosQall *qall = taosMemoryCalloc(1, sizeof(STaosQall));
|
||||||
|
if (qall != NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
return qall;
|
||||||
|
}
|
||||||
|
|
||||||
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
|
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
|
||||||
|
|
||||||
|
@ -207,12 +249,12 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
||||||
qall->current = queue->head;
|
qall->current = queue->head;
|
||||||
qall->start = queue->head;
|
qall->start = queue->head;
|
||||||
qall->numOfItems = queue->numOfItems;
|
qall->numOfItems = queue->numOfItems;
|
||||||
qall->itemSize = queue->itemSize;
|
|
||||||
code = qall->numOfItems;
|
code = qall->numOfItems;
|
||||||
|
|
||||||
queue->head = NULL;
|
queue->head = NULL;
|
||||||
queue->tail = NULL;
|
queue->tail = NULL;
|
||||||
queue->numOfItems = 0;
|
queue->numOfItems = 0;
|
||||||
|
queue->memOfItems = 0;
|
||||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
|
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,9 +419,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FI
|
||||||
queue->head = pNode->next;
|
queue->head = pNode->next;
|
||||||
if (queue->head == NULL) queue->tail = NULL;
|
if (queue->head == NULL) queue->tail = NULL;
|
||||||
queue->numOfItems--;
|
queue->numOfItems--;
|
||||||
|
queue->memOfItems -= pNode->size;
|
||||||
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
||||||
code = 1;
|
code = 1;
|
||||||
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
|
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
|
||||||
|
queue->memOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
@ -411,7 +455,6 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
|
||||||
qall->current = queue->head;
|
qall->current = queue->head;
|
||||||
qall->start = queue->head;
|
qall->start = queue->head;
|
||||||
qall->numOfItems = queue->numOfItems;
|
qall->numOfItems = queue->numOfItems;
|
||||||
qall->itemSize = queue->itemSize;
|
|
||||||
code = qall->numOfItems;
|
code = qall->numOfItems;
|
||||||
if (ahandle) *ahandle = queue->ahandle;
|
if (ahandle) *ahandle = queue->ahandle;
|
||||||
if (itemsFp) *itemsFp = queue->itemsFp;
|
if (itemsFp) *itemsFp = queue->itemsFp;
|
||||||
|
@ -419,6 +462,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
|
||||||
queue->head = NULL;
|
queue->head = NULL;
|
||||||
queue->tail = NULL;
|
queue->tail = NULL;
|
||||||
queue->numOfItems = 0;
|
queue->numOfItems = 0;
|
||||||
|
queue->memOfItems = 0;
|
||||||
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
||||||
for (int32_t j = 1; j < qall->numOfItems; ++j) {
|
for (int32_t j = 1; j < qall->numOfItems; ++j) {
|
||||||
tsem_wait(&qset->sem);
|
tsem_wait(&qset->sem);
|
||||||
|
@ -444,23 +488,3 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) {
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&qset->mutex);
|
taosThreadMutexUnlock(&qset->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
|
|
||||||
if (!queue) return 0;
|
|
||||||
|
|
||||||
int32_t num;
|
|
||||||
taosThreadMutexLock(&queue->mutex);
|
|
||||||
num = queue->numOfItems;
|
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
|
||||||
return num;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
|
|
||||||
if (!qset) return 0;
|
|
||||||
|
|
||||||
int32_t num = 0;
|
|
||||||
taosThreadMutexLock(&qset->mutex);
|
|
||||||
num = qset->numOfItems;
|
|
||||||
taosThreadMutexUnlock(&qset->mutex);
|
|
||||||
return num;
|
|
||||||
}
|
|
||||||
|
|
|
@ -49,7 +49,38 @@ class TDTestCase:
|
||||||
print(cur)
|
print(cur)
|
||||||
return cur
|
return cur
|
||||||
|
|
||||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg,showRow,cdbName,valgrind=0):
|
def initConsumerTable(self,cdbName='cdb'):
|
||||||
|
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||||
|
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||||
|
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||||
|
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||||
|
|
||||||
|
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||||
|
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||||
|
|
||||||
|
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||||
|
sql = "insert into %s.consumeinfo values "%cdbName
|
||||||
|
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||||
|
tdLog.info("consume info sql: %s"%sql)
|
||||||
|
tdSql.query(sql)
|
||||||
|
|
||||||
|
def selectConsumeResult(self,expectRows,cdbName='cdb'):
|
||||||
|
resultList=[]
|
||||||
|
while 1:
|
||||||
|
tdSql.query("select * from %s.consumeresult"%cdbName)
|
||||||
|
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||||
|
if tdSql.getRows() == expectRows:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
for i in range(expectRows):
|
||||||
|
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||||
|
resultList.append(tdSql.getData(i , 3))
|
||||||
|
|
||||||
|
return resultList
|
||||||
|
|
||||||
|
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||||
shellCmd = 'nohup '
|
shellCmd = 'nohup '
|
||||||
if valgrind == 1:
|
if valgrind == 1:
|
||||||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||||
|
@ -87,6 +118,8 @@ class TDTestCase:
|
||||||
pre_insert = "insert into "
|
pre_insert = "insert into "
|
||||||
sql = pre_insert
|
sql = pre_insert
|
||||||
|
|
||||||
|
t = time.time()
|
||||||
|
startTs = int(round(t * 1000))
|
||||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
sql += " %s_%d values "%(stbName,i)
|
sql += " %s_%d values "%(stbName,i)
|
||||||
|
@ -127,7 +160,7 @@ class TDTestCase:
|
||||||
return
|
return
|
||||||
|
|
||||||
def tmqCase1(self, cfgPath, buildPath):
|
def tmqCase1(self, cfgPath, buildPath):
|
||||||
tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db")
|
tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db, inclue 1 stb")
|
||||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
# create and start thread
|
# create and start thread
|
||||||
parameterDict = {'cfg': '', \
|
parameterDict = {'cfg': '', \
|
||||||
|
@ -135,11 +168,13 @@ class TDTestCase:
|
||||||
'vgroups': 4, \
|
'vgroups': 4, \
|
||||||
'stbName': 'stb', \
|
'stbName': 'stb', \
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 100000, \
|
'rowsPerTbl': 10000, \
|
||||||
'batchNum': 200, \
|
'batchNum': 100, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
@ -149,23 +184,16 @@ class TDTestCase:
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
tdLog.info("create consume info table and consume result table")
|
|
||||||
cdbName = parameterDict["dbName"]
|
|
||||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
|
||||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
keyList = 'group.id:cgrp1,\
|
keyList = 'group.id:cgrp1,\
|
||||||
enable.auto.commit:false,\
|
enable.auto.commit:false,\
|
||||||
auto.commit.interval.ms:6000,\
|
auto.commit.interval.ms:6000,\
|
||||||
auto.offset.reset:earliest'
|
auto.offset.reset:earliest'
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
tdSql.query(sql)
|
|
||||||
|
|
||||||
event.wait()
|
event.wait()
|
||||||
|
|
||||||
|
@ -173,32 +201,28 @@ class TDTestCase:
|
||||||
pollDelay = 5
|
pollDelay = 5
|
||||||
showMsg = 1
|
showMsg = 1
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
# wait for data ready
|
# wait for data ready
|
||||||
prepareEnvThread.join()
|
prepareEnvThread.join()
|
||||||
|
|
||||||
tdLog.info("insert process end, and start to check consume result")
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
while 1:
|
expectRows = 1
|
||||||
tdSql.query("select * from %s.consumeresult"%cdbName)
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
totalConsumeRows = 0
|
||||||
if tdSql.getRows() == 1:
|
for i in range(expectRows):
|
||||||
break
|
totalConsumeRows += resultList[i]
|
||||||
else:
|
|
||||||
time.sleep(5)
|
|
||||||
|
|
||||||
tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3)))
|
if totalConsumeRows != expectrowcnt:
|
||||||
tdSql.checkData(0 , 1, consumerId)
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
# mulit rows and mulit tables in one sql, this num of msg is not sure
|
tdLog.exit("tmq consume rows error!")
|
||||||
#tdSql.checkData(0 , 2, expectmsgcnt)
|
|
||||||
tdSql.checkData(0 , 3, expectrowcnt+1)
|
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicName1)
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
|
||||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
def tmqCase2(self, cfgPath, buildPath):
|
def tmqCase2(self, cfgPath, buildPath):
|
||||||
tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db")
|
tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db, inclue 1 stb")
|
||||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
# create and start thread
|
# create and start thread
|
||||||
parameterDict = {'cfg': '', \
|
parameterDict = {'cfg': '', \
|
||||||
|
@ -206,11 +230,13 @@ class TDTestCase:
|
||||||
'vgroups': 4, \
|
'vgroups': 4, \
|
||||||
'stbName': 'stb', \
|
'stbName': 'stb', \
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 100000, \
|
'rowsPerTbl': 10000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
@ -221,27 +247,19 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
tdLog.info("create consume info table and consume result table")
|
|
||||||
cdbName = parameterDict["dbName"]
|
|
||||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
|
||||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
keyList = 'group.id:cgrp1,\
|
keyList = 'group.id:cgrp1,\
|
||||||
enable.auto.commit:false,\
|
enable.auto.commit:false,\
|
||||||
auto.commit.interval.ms:6000,\
|
auto.commit.interval.ms:6000,\
|
||||||
auto.offset.reset:earliest'
|
auto.offset.reset:earliest'
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
tdSql.query(sql)
|
|
||||||
|
|
||||||
consumerId = 1
|
consumerId = 1
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
tdSql.query(sql)
|
|
||||||
|
|
||||||
event.wait()
|
event.wait()
|
||||||
|
|
||||||
|
@ -249,30 +267,20 @@ class TDTestCase:
|
||||||
pollDelay = 5
|
pollDelay = 5
|
||||||
showMsg = 1
|
showMsg = 1
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
# wait for data ready
|
# wait for data ready
|
||||||
prepareEnvThread.join()
|
prepareEnvThread.join()
|
||||||
|
|
||||||
tdLog.info("insert process end, and start to check consume result")
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
while 1:
|
expectRows = 2
|
||||||
tdSql.query("select * from %s.consumeresult"%cdbName)
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
totalConsumeRows = 0
|
||||||
if tdSql.getRows() == 2:
|
for i in range(expectRows):
|
||||||
break
|
totalConsumeRows += resultList[i]
|
||||||
else:
|
|
||||||
time.sleep(5)
|
|
||||||
|
|
||||||
consumerId0 = tdSql.getData(0 , 1)
|
if totalConsumeRows != expectrowcnt:
|
||||||
consumerId1 = tdSql.getData(1 , 1)
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
actConsumeRows0 = tdSql.getData(0 , 3)
|
|
||||||
actConsumeRows1 = tdSql.getData(1 , 3)
|
|
||||||
|
|
||||||
tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0))
|
|
||||||
tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1))
|
|
||||||
|
|
||||||
totalConsumeRows = actConsumeRows0 + actConsumeRows1
|
|
||||||
if totalConsumeRows != expectrowcnt + 2:
|
|
||||||
tdLog.exit("tmq consume rows error!")
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicName1)
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
@ -288,11 +296,13 @@ class TDTestCase:
|
||||||
'vgroups': 4, \
|
'vgroups': 4, \
|
||||||
'stbName': 'stb', \
|
'stbName': 'stb', \
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 100000, \
|
'rowsPerTbl': 10000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
@ -303,7 +313,7 @@ class TDTestCase:
|
||||||
'vgroups': 4, \
|
'vgroups': 4, \
|
||||||
'stbName': 'stb2', \
|
'stbName': 'stb2', \
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 100000, \
|
'rowsPerTbl': 10000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
@ -316,27 +326,19 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
tdLog.info("create consume info table and consume result table")
|
|
||||||
cdbName = parameterDict["dbName"]
|
|
||||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
|
||||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
keyList = 'group.id:cgrp1,\
|
keyList = 'group.id:cgrp1,\
|
||||||
enable.auto.commit:false,\
|
enable.auto.commit:false,\
|
||||||
auto.commit.interval.ms:6000,\
|
auto.commit.interval.ms:6000,\
|
||||||
auto.offset.reset:earliest'
|
auto.offset.reset:earliest'
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
tdSql.query(sql)
|
|
||||||
|
|
||||||
# consumerId = 1
|
# consumerId = 1
|
||||||
# sql = "insert into %s.consumeinfo values "%cdbName
|
# self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
# sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
# tdSql.query(sql)
|
|
||||||
|
|
||||||
event.wait()
|
event.wait()
|
||||||
|
|
||||||
|
@ -344,31 +346,21 @@ class TDTestCase:
|
||||||
pollDelay = 5
|
pollDelay = 5
|
||||||
showMsg = 1
|
showMsg = 1
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
# wait for data ready
|
# wait for data ready
|
||||||
prepareEnvThread.join()
|
prepareEnvThread.join()
|
||||||
prepareEnvThread2.join()
|
prepareEnvThread2.join()
|
||||||
|
|
||||||
tdLog.info("insert process end, and start to check consume result")
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
while 1:
|
expectRows = 1
|
||||||
tdSql.query("select * from %s.consumeresult"%cdbName)
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
totalConsumeRows = 0
|
||||||
if tdSql.getRows() == 1:
|
for i in range(expectRows):
|
||||||
break
|
totalConsumeRows += resultList[i]
|
||||||
else:
|
|
||||||
time.sleep(5)
|
|
||||||
|
|
||||||
consumerId0 = tdSql.getData(0 , 1)
|
if totalConsumeRows != expectrowcnt:
|
||||||
#consumerId1 = tdSql.getData(1 , 1)
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
actConsumeRows0 = tdSql.getData(0 , 3)
|
|
||||||
#actConsumeRows1 = tdSql.getData(1 , 3)
|
|
||||||
|
|
||||||
tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0))
|
|
||||||
#tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1))
|
|
||||||
|
|
||||||
#totalConsumeRows = actConsumeRows0 + actConsumeRows1
|
|
||||||
if actConsumeRows0 != expectrowcnt + 1:
|
|
||||||
tdLog.exit("tmq consume rows error!")
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicName1)
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
@ -386,9 +378,9 @@ class TDTestCase:
|
||||||
cfgPath = buildPath + "/../sim/psim/cfg"
|
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||||
tdLog.info("cfgPath: %s" % cfgPath)
|
tdLog.info("cfgPath: %s" % cfgPath)
|
||||||
|
|
||||||
#self.tmqCase1(cfgPath, buildPath)
|
self.tmqCase1(cfgPath, buildPath)
|
||||||
self.tmqCase2(cfgPath, buildPath)
|
#self.tmqCase2(cfgPath, buildPath)
|
||||||
#self.tmqCase3(cfgPath, buildPath)
|
self.tmqCase3(cfgPath, buildPath)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
|
@ -509,7 +509,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
|
||||||
|
|
||||||
bool shellIsLimitQuery(const char *sql) {
|
bool shellIsLimitQuery(const char *sql) {
|
||||||
//todo refactor
|
//todo refactor
|
||||||
if (strcasestr(sql, " limit ") != NULL) {
|
if (taosStrCaseStr(sql, " limit ") != NULL) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue