Merge pull request #28709 from taosdata/main
merge: from main to 3.0 branch
This commit is contained in:
commit
f46cf6044d
|
@ -26,6 +26,22 @@ SHOW USERS;
|
|||
```sql
|
||||
ALTER USER TEST DROP HOST HOST_NAME1
|
||||
```
|
||||
说明
|
||||
- 开源版和企业版本都能添加成功,且可以查询到,但是开源版本不会对 IP 做任何限制。
|
||||
- create user u_write pass 'taosdata1' host 'iprange1','iprange2', 可以一次添加多个 iprange, 服务端会做去重,去重的逻辑是需要 iprange 完全一样
|
||||
- 默认会把 127.0.0.1 添加到白名单列表,且在白名单列表可以查询
|
||||
- 集群的节点 IP 集合会自动添加到白名单列表,但是查询不到。
|
||||
- taosadaper 和 taosd 不在一个机器的时候,需要把 taosadaper IP 手动添加到 taosd 白名单列表中
|
||||
- 集群情况下,各个节点 enableWhiteList 成一样,或者全为 false,或者全为 true, 要不然集群无法启动
|
||||
- 白名单变更生效时间 1s,不超过 2s, 每次变更对收发性能有些微影响(多一次判断,可以忽略),变更完之后、影响忽略不计, 变更过程中对集群没有影响,对正在访问客户端也没有影响(假设这些客户端的 IP 包含在 white list 内)
|
||||
- 如果添加两个 ip range, 192.168.1.1/16(假设为 A), 192.168.1.1/24(假设为 B), 严格来说,A 包含了 B,但是考虑情况太复杂,并不会对 A 和 B 做合并
|
||||
- 要删除的时候,必须严格匹配。 也就是如果添加的是 192.168.1.1/24, 要删除也是 192.168.1.1/24
|
||||
- 只有 root 才有权限对其他用户增删 ip white list
|
||||
- 兼容之前的版本,但是不支持从当前版本回退到之前版本
|
||||
- x.x.x.x/32 和 x.x.x.x 属于同一个 iprange, 显示为 x.x.x.x
|
||||
- 如果客户端拿到的 0.0.0.0/0, 说明没有开启白名单
|
||||
- 如果白名单发生了改变, 客户端会在 heartbeat 里检测到。
|
||||
- 针对一个 user, 添加的 IP 个数上限是 2048
|
||||
|
||||
## 审计日志
|
||||
|
||||
|
|
|
@ -2307,6 +2307,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
SExplainRsp rsp;
|
||||
uint64_t qId;
|
||||
uint64_t cId;
|
||||
uint64_t tId;
|
||||
int64_t rId;
|
||||
int32_t eId;
|
||||
|
@ -2660,6 +2661,7 @@ typedef struct SSubQueryMsg {
|
|||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
uint64_t queryId;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
int64_t refId;
|
||||
int32_t execId;
|
||||
|
@ -2689,6 +2691,7 @@ typedef struct {
|
|||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
uint64_t queryId;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
int32_t execId;
|
||||
} SQueryContinueReq;
|
||||
|
@ -2723,6 +2726,7 @@ typedef struct {
|
|||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
uint64_t queryId;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
int32_t execId;
|
||||
SOperatorParam* pOpParam;
|
||||
|
@ -2738,6 +2742,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
uint64_t queryId;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
int64_t refId;
|
||||
int32_t execId;
|
||||
|
@ -2784,6 +2789,7 @@ typedef struct {
|
|||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
uint64_t queryId;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
int64_t refId;
|
||||
int32_t execId;
|
||||
|
@ -2797,6 +2803,7 @@ typedef struct {
|
|||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
uint64_t queryId;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
int64_t refId;
|
||||
int32_t execId;
|
||||
|
@ -2813,6 +2820,7 @@ typedef struct {
|
|||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
uint64_t queryId;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
int64_t refId;
|
||||
int32_t execId;
|
||||
|
@ -4262,6 +4270,7 @@ typedef struct {
|
|||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
uint64_t queryId;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
uint32_t sqlLen;
|
||||
uint32_t phyLen;
|
||||
|
|
|
@ -62,7 +62,8 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
|
|||
int64_t factor = (precision == TSDB_TIME_PRECISION_MILLI) ? 1000
|
||||
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000
|
||||
: 1000000000;
|
||||
time_t t = taosTime(NULL);
|
||||
time_t t;
|
||||
(void) taosTime(&t);
|
||||
struct tm tm;
|
||||
(void) taosLocalTime(&t, &tm, NULL, 0);
|
||||
tm.tm_hour = 0;
|
||||
|
|
|
@ -31,7 +31,7 @@ typedef void* DataSinkHandle;
|
|||
struct SRpcMsg;
|
||||
struct SSubplan;
|
||||
|
||||
typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*);
|
||||
typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*);
|
||||
|
||||
typedef struct {
|
||||
void* handle;
|
||||
|
|
|
@ -624,6 +624,7 @@ typedef struct SAggPhysiNode {
|
|||
typedef struct SDownstreamSourceNode {
|
||||
ENodeType type;
|
||||
SQueryNodeAddr addr;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
uint64_t schedId;
|
||||
int32_t execId;
|
||||
|
|
|
@ -105,11 +105,11 @@ void qWorkerDestroy(void **qWorkerMgmt);
|
|||
|
||||
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
|
||||
|
||||
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId,
|
||||
SQWMsg *qwMsg, SArray *explainRes);
|
||||
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId,
|
||||
int32_t eId, SQWMsg *qwMsg, SArray *explainRes);
|
||||
|
||||
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId,
|
||||
void **pRsp, SArray *explainRes);
|
||||
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId,
|
||||
int32_t eId, void **pRsp, SArray *explainRes);
|
||||
|
||||
int32_t qWorkerDbgEnableDebug(char *option);
|
||||
|
||||
|
|
|
@ -83,6 +83,9 @@ void schedulerStopQueryHb(void* pTrans);
|
|||
int32_t schedulerUpdatePolicy(int32_t policy);
|
||||
int32_t schedulerEnableReSchedule(bool enableResche);
|
||||
|
||||
int32_t initClientId(void);
|
||||
uint64_t getClientId(void);
|
||||
|
||||
/**
|
||||
* Cancel query job
|
||||
* @param pJob
|
||||
|
|
|
@ -93,7 +93,7 @@ static FORCE_INLINE int64_t taosGetMonoTimestampMs() {
|
|||
char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm);
|
||||
struct tm *taosLocalTime(const time_t *timep, struct tm *result, char *buf, int32_t bufSize);
|
||||
struct tm *taosLocalTimeNolock(struct tm *result, const time_t *timep, int dst);
|
||||
time_t taosTime(time_t *t);
|
||||
int32_t taosTime(time_t *t);
|
||||
time_t taosMktime(struct tm *timep);
|
||||
int64_t user_mktime64(const uint32_t year, const uint32_t mon, const uint32_t day, const uint32_t hour,
|
||||
const uint32_t min, const uint32_t sec, int64_t time_zone);
|
||||
|
|
|
@ -982,6 +982,7 @@ void taos_init_imp(void) {
|
|||
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
||||
ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
|
||||
ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
|
||||
ENV_ERR_RET(initClientId(), "failed to init clientId");
|
||||
|
||||
tscDebug("starting to initialize TAOS driver");
|
||||
|
||||
|
|
|
@ -166,6 +166,7 @@ const char* columnCompressStr(uint16_t type) {
|
|||
}
|
||||
|
||||
uint8_t columnLevelVal(const char* level) {
|
||||
if (level == NULL) return TSDB_COLVAL_LEVEL_NOCHANGE;
|
||||
uint8_t l = TSDB_COLVAL_LEVEL_MEDIUM;
|
||||
if (0 == strcmp(level, "h") || 0 == strcmp(level, TSDB_COLUMN_LEVEL_HIGH)) {
|
||||
l = TSDB_COLVAL_LEVEL_HIGH;
|
||||
|
@ -180,6 +181,7 @@ uint8_t columnLevelVal(const char* level) {
|
|||
}
|
||||
|
||||
uint16_t columnCompressVal(const char* compress) {
|
||||
if (compress == NULL) return TSDB_COLVAL_COMPRESS_NOCHANGE;
|
||||
uint16_t c = TSDB_COLVAL_COMPRESS_LZ4;
|
||||
if (0 == strcmp(compress, TSDB_COLUMN_COMPRESS_LZ4)) {
|
||||
c = TSDB_COLVAL_COMPRESS_LZ4;
|
||||
|
@ -200,6 +202,7 @@ uint16_t columnCompressVal(const char* compress) {
|
|||
}
|
||||
|
||||
uint8_t columnEncodeVal(const char* encode) {
|
||||
if (encode == NULL) return TSDB_COLVAL_ENCODE_NOCHANGE;
|
||||
uint8_t e = TSDB_COLVAL_ENCODE_SIMPLE8B;
|
||||
if (0 == strcmp(encode, TSDB_COLUMN_ENCODE_SIMPLE8B)) {
|
||||
e = TSDB_COLVAL_ENCODE_SIMPLE8B;
|
||||
|
@ -311,6 +314,7 @@ void setColLevel(uint32_t* compress, uint8_t level) {
|
|||
|
||||
int32_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check,
|
||||
uint32_t* compress) {
|
||||
if(compress == NULL) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
||||
if (check && !validColEncode(type, encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
||||
setColEncode(compress, encode);
|
||||
|
||||
|
|
|
@ -8717,6 +8717,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
|
|||
TAOS_CHECK_EXIT(tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen));
|
||||
TAOS_CHECK_EXIT(tEncodeU32(&encoder, pReq->msgLen));
|
||||
TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (uint8_t *)pReq->msg, pReq->msgLen));
|
||||
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -8765,6 +8766,11 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq)
|
|||
TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pReq->sql));
|
||||
TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pReq->msgLen));
|
||||
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, NULL));
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId));
|
||||
} else {
|
||||
pReq->clientId = 0;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
@ -8894,6 +8900,7 @@ int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
|
|||
} else {
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, 0));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -8943,6 +8950,11 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq)
|
|||
}
|
||||
TAOS_CHECK_EXIT(tDeserializeSOperatorParam(&decoder, pReq->pOpParam));
|
||||
}
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId));
|
||||
} else {
|
||||
pReq->clientId = 0;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
@ -9055,6 +9067,7 @@ int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
|
|||
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->taskId));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->refId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->execId));
|
||||
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -9095,6 +9108,11 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq)
|
|||
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->taskId));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->refId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->execId));
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId));
|
||||
} else {
|
||||
pReq->clientId = 0;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
@ -9123,6 +9141,7 @@ int32_t tSerializeSTaskNotifyReq(void *buf, int32_t bufLen, STaskNotifyReq *pReq
|
|||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->refId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->execId));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
|
||||
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -9164,6 +9183,11 @@ int32_t tDeserializeSTaskNotifyReq(void *buf, int32_t bufLen, STaskNotifyReq *pR
|
|||
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->refId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->execId));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(&decoder, (int32_t *)&pReq->type));
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId));
|
||||
} else {
|
||||
pReq->clientId = 0;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
@ -9353,6 +9377,10 @@ int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pR
|
|||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, status->execId));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, status->status));
|
||||
}
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
STaskStatus *status = taosArrayGet(pRsp->taskStatus, i);
|
||||
TAOS_CHECK_EXIT(tEncodeU64(&encoder, status->clientId));
|
||||
}
|
||||
} else {
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, 0));
|
||||
}
|
||||
|
@ -9396,6 +9424,12 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *
|
|||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
STaskStatus *status = taosArrayGet(pRsp->taskStatus, i);
|
||||
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &status->clientId));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pRsp->taskStatus = NULL;
|
||||
}
|
||||
|
@ -9560,6 +9594,7 @@ int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
|||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->sql));
|
||||
TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->msg, pReq->phyLen));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->source));
|
||||
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId));
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -9608,6 +9643,11 @@ int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
|||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->source));
|
||||
}
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId));
|
||||
} else {
|
||||
pReq->clientId = 0;
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -30,7 +30,7 @@ static int64_t m_deltaUtc = 0;
|
|||
|
||||
void deltaToUtcInitOnce() {
|
||||
struct tm tm = {0};
|
||||
if (taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm) != 0) {
|
||||
if (taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm) == NULL) {
|
||||
uError("failed to parse time string");
|
||||
}
|
||||
m_deltaUtc = (int64_t)taosMktime(&tm);
|
||||
|
|
|
@ -35,6 +35,9 @@
|
|||
extern SConfig* tsCfg;
|
||||
|
||||
static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRetrieveTableRsp** pRsp) {
|
||||
if (NULL == pBlock || NULL == pRsp) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
|
||||
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
|
||||
*pRsp = taosMemoryCalloc(1, rspSize);
|
||||
|
@ -216,6 +219,9 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
|
|||
|
||||
static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** pRsp, int8_t biMode) {
|
||||
SDescribeStmt* pDesc = (SDescribeStmt*)pStmt;
|
||||
if (NULL == pDesc || NULL == pDesc->pMeta) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
int32_t numOfRows = TABLE_TOTAL_COL_NUM(pDesc->pMeta);
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
|
@ -505,7 +511,7 @@ static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) {
|
|||
return code;
|
||||
}
|
||||
|
||||
void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||
static void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
|
||||
SSchema* pSchema = pCfg->pSchemas + i;
|
||||
#define LTYPE_LEN (32 + 60) // 60 byte for compress info
|
||||
|
@ -539,7 +545,7 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
|||
}
|
||||
}
|
||||
|
||||
void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||
static void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||
for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
|
||||
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
|
||||
char type[32];
|
||||
|
@ -558,7 +564,7 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
|||
}
|
||||
}
|
||||
|
||||
void appendTagNameFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||
static void appendTagNameFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||
for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
|
||||
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
|
||||
*len += tsnprintf(buf + VARSTR_HEADER_SIZE + *len, SHOW_CREATE_TB_RESULT_FIELD2_LEN - (VARSTR_HEADER_SIZE + *len),
|
||||
|
@ -566,7 +572,7 @@ void appendTagNameFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||
static int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* pTagVals = NULL;
|
||||
STag* pTag = (STag*)pCfg->pTags;
|
||||
|
@ -643,7 +649,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg* pCfg) {
|
||||
static void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg* pCfg) {
|
||||
if (pCfg->commentLen > 0) {
|
||||
*len += tsnprintf(buf + VARSTR_HEADER_SIZE + *len, SHOW_CREATE_TB_RESULT_FIELD2_LEN - (VARSTR_HEADER_SIZE + *len),
|
||||
" COMMENT '%s'", pCfg->pComment);
|
||||
|
@ -997,7 +1003,7 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
|
||||
static int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
|
||||
QRY_ERR_RET(blockDataEnsureCapacity(pBlock, 1));
|
||||
|
||||
int32_t index = 0;
|
||||
|
|
|
@ -30,8 +30,8 @@ char *gJoinTypeStr[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = {
|
|||
/*FULL*/ {"Full Join", "Full Join", NULL, NULL, NULL, NULL},
|
||||
};
|
||||
|
||||
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
|
||||
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel);
|
||||
static int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
|
||||
static int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel);
|
||||
|
||||
char *qExplainGetDynQryCtrlType(EDynQueryType type) {
|
||||
switch (type) {
|
||||
|
@ -118,7 +118,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
|
|||
taosMemoryFree(pCtx);
|
||||
}
|
||||
|
||||
int32_t qExplainInitCtx(SExplainCtx **pCtx, SHashObj *groupHash, bool verbose, double ratio, EExplainMode mode) {
|
||||
static int32_t qExplainInitCtx(SExplainCtx **pCtx, SHashObj *groupHash, bool verbose, double ratio, EExplainMode mode) {
|
||||
int32_t code = 0;
|
||||
SExplainCtx *ctx = taosMemoryCalloc(1, sizeof(SExplainCtx));
|
||||
if (NULL == ctx) {
|
||||
|
@ -158,7 +158,7 @@ _return:
|
|||
QRY_RET(code);
|
||||
}
|
||||
|
||||
int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNodeList **pChildren) {
|
||||
static int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNodeList **pChildren) {
|
||||
int32_t tlen = 0;
|
||||
SNodeList *pPhysiChildren = pNode->pChildren;
|
||||
|
||||
|
@ -180,7 +180,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainGenerateResNodeExecInfo(SPhysiNode *pNode, SArray **pExecInfo, SExplainGroup *group) {
|
||||
static int32_t qExplainGenerateResNodeExecInfo(SPhysiNode *pNode, SArray **pExecInfo, SExplainGroup *group) {
|
||||
*pExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainExecInfo));
|
||||
if (NULL == (*pExecInfo)) {
|
||||
qError("taosArrayInit %d explainExecInfo failed", group->nodeNum);
|
||||
|
@ -217,7 +217,7 @@ int32_t qExplainGenerateResNodeExecInfo(SPhysiNode *pNode, SArray **pExecInfo, S
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pResNode) {
|
||||
static int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pResNode) {
|
||||
if (NULL == pNode) {
|
||||
*pResNode = NULL;
|
||||
qError("physical node is NULL");
|
||||
|
@ -250,7 +250,7 @@ _return:
|
|||
QRY_RET(code);
|
||||
}
|
||||
|
||||
int32_t qExplainBufAppendExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) {
|
||||
static int32_t qExplainBufAppendExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) {
|
||||
int32_t tlen = *len;
|
||||
int32_t nodeNum = taosArrayGetSize(pExecInfo);
|
||||
SExplainExecInfo maxExecInfo = {0};
|
||||
|
@ -275,7 +275,7 @@ int32_t qExplainBufAppendExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainBufAppendVerboseExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) {
|
||||
static int32_t qExplainBufAppendVerboseExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) {
|
||||
int32_t tlen = 0;
|
||||
bool gotVerbose = false;
|
||||
int32_t nodeNum = taosArrayGetSize(pExecInfo);
|
||||
|
@ -297,7 +297,7 @@ int32_t qExplainBufAppendVerboseExecInfo(SArray *pExecInfo, char *tbuf, int32_t
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t level) {
|
||||
static int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t level) {
|
||||
SQueryExplainRowInfo row = {0};
|
||||
row.buf = taosMemoryMalloc(len);
|
||||
if (NULL == row.buf) {
|
||||
|
@ -362,7 +362,7 @@ static char* qExplainGetScanDataLoad(STableScanPhysiNode* pScan) {
|
|||
return "unknown";
|
||||
}
|
||||
|
||||
int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, int32_t level) {
|
||||
static int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, int32_t level) {
|
||||
int32_t tlen = 0;
|
||||
bool isVerboseLine = false;
|
||||
char *tbuf = ctx->tbuf;
|
||||
|
@ -1900,7 +1900,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32_t level) {
|
||||
static int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32_t level) {
|
||||
if (NULL == pResNode) {
|
||||
qError("explain res node is NULL");
|
||||
QRY_ERR_RET(TSDB_CODE_APP_ERROR);
|
||||
|
@ -1915,7 +1915,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel) {
|
||||
static int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel) {
|
||||
SExplainResNode *node = NULL;
|
||||
int32_t code = 0;
|
||||
SExplainCtx *ctx = (SExplainCtx *)pCtx;
|
||||
|
@ -1940,7 +1940,7 @@ _return:
|
|||
QRY_RET(code);
|
||||
}
|
||||
|
||||
int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
||||
static int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
||||
int32_t code = 0;
|
||||
SSDataBlock *pBlock = NULL;
|
||||
SExplainCtx *pCtx = (SExplainCtx *)ctx;
|
||||
|
@ -1997,7 +1997,7 @@ _return:
|
|||
QRY_RET(code);
|
||||
}
|
||||
|
||||
int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) {
|
||||
static int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) {
|
||||
int32_t code = 0;
|
||||
SNodeListNode *plans = NULL;
|
||||
int32_t taskNum = 0;
|
||||
|
@ -2080,7 +2080,7 @@ _return:
|
|||
QRY_RET(code);
|
||||
}
|
||||
|
||||
int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
|
||||
static int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
|
||||
if (EXPLAIN_MODE_ANALYZE != pCtx->mode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -2103,7 +2103,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
|
||||
static int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0, false));
|
||||
QRY_ERR_RET(qExplainAppendPlanRows(pCtx));
|
||||
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
|
||||
|
|
|
@ -121,10 +121,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
|||
}
|
||||
} else {
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
|
||||
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
|
||||
qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
|
||||
" execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
|
||||
pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
|
||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||
}
|
||||
break;
|
||||
|
@ -141,17 +141,17 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
|||
|
||||
if (pRsp->completed == 1) {
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||
qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
|
||||
" execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
|
||||
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
|
||||
totalSources);
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
|
||||
pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
|
||||
pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
|
||||
} else {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%" PRId64
|
||||
", totalRows:%" PRIu64 ", total:%.2f Kb",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
|
||||
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
|
||||
qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
|
||||
" execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
|
||||
pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||
|
@ -640,9 +640,9 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
|||
|
||||
if (pSource->localExec) {
|
||||
SDataBuf pBuf = {0};
|
||||
int32_t code =
|
||||
(*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
|
||||
pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
|
||||
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
|
||||
pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
|
||||
pTaskInfo->localFetch.explainRes);
|
||||
code = loadRemoteDataCallback(pWrapper, &pBuf, code);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
taosMemoryFree(pWrapper);
|
||||
|
@ -650,6 +650,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
|||
SResFetchReq req = {0};
|
||||
req.header.vgId = pSource->addr.nodeId;
|
||||
req.sId = pSource->schedId;
|
||||
req.clientId = pSource->clientId;
|
||||
req.taskId = pSource->taskId;
|
||||
req.queryId = pTaskInfo->id.queryId;
|
||||
req.execId = pSource->execId;
|
||||
|
@ -691,9 +692,10 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
|||
|
||||
freeOperatorParam(req.pOpParam, OP_GET_PARAM);
|
||||
|
||||
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %p, %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId,
|
||||
pSource->execId, pExchangeInfo, sourceIndex, totalSources);
|
||||
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
|
||||
", execId:%d, %p, %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
|
||||
pSource->taskId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
|
||||
|
||||
// send the fetch remote task result reques
|
||||
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
|
@ -974,8 +976,9 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
|
||||
pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
|
||||
qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
|
||||
tstrerror(pDataInfo->code));
|
||||
pOperator->pTaskInfo->code = pDataInfo->code;
|
||||
return pOperator->pTaskInfo->code;
|
||||
}
|
||||
|
@ -984,10 +987,10 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
|
||||
if (pRsp->numOfRows == 0) {
|
||||
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 " try next",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
|
||||
pDataInfo->totalRows, pLoadInfo->totalRows);
|
||||
qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
|
||||
" execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
|
||||
pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
|
||||
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
pExchangeInfo->current += 1;
|
||||
|
@ -1002,19 +1005,19 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
|
||||
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
||||
if (pRsp->completed == 1) {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
|
||||
qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
|
||||
", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
||||
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
|
||||
totalSources);
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
|
||||
pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
|
||||
pExchangeInfo->current + 1, totalSources);
|
||||
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
pExchangeInfo->current += 1;
|
||||
} else {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", totalRows:%" PRIu64
|
||||
", totalBytes:%" PRIu64,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
||||
pLoadInfo->totalRows, pLoadInfo->totalSize);
|
||||
qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
|
||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
|
||||
pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
|
||||
}
|
||||
|
||||
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
|
||||
|
|
|
@ -188,7 +188,11 @@ static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) {
|
|||
|
||||
static int32_t addTimezoneParam(SNodeList* pList) {
|
||||
char buf[TD_TIME_STR_LEN] = {0};
|
||||
time_t t = taosTime(NULL);
|
||||
time_t t;
|
||||
int32_t code = taosTime(&t);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
struct tm tmInfo;
|
||||
if (taosLocalTime(&t, &tmInfo, buf, sizeof(buf)) != NULL) {
|
||||
(void)strftime(buf, sizeof(buf), "%z", &tmInfo);
|
||||
|
@ -196,7 +200,7 @@ static int32_t addTimezoneParam(SNodeList* pList) {
|
|||
int32_t len = (int32_t)strlen(buf);
|
||||
|
||||
SValueNode* pVal = NULL;
|
||||
int32_t code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal);
|
||||
code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal);
|
||||
if (pVal == NULL) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -64,6 +64,10 @@ static void udfWatchUdfd(void *args);
|
|||
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
|
||||
fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
|
||||
SUdfdData *pData = process->data;
|
||||
if(pData == NULL) {
|
||||
fnError("udfd process data is NULL");
|
||||
return;
|
||||
}
|
||||
if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
|
||||
fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
|
||||
} else {
|
||||
|
|
|
@ -1507,7 +1507,7 @@ static void removeListeningPipe() {
|
|||
int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
|
||||
uv_fs_req_cleanup(&req);
|
||||
if(err) {
|
||||
fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__);
|
||||
fnInfo("remove listening pipe %s : %s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -851,6 +851,7 @@ static int32_t slotDescCopy(const SSlotDescNode* pSrc, SSlotDescNode* pDst) {
|
|||
|
||||
static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstreamSourceNode* pDst) {
|
||||
COPY_OBJECT_FIELD(addr, sizeof(SQueryNodeAddr));
|
||||
COPY_SCALAR_FIELD(clientId);
|
||||
COPY_SCALAR_FIELD(taskId);
|
||||
COPY_SCALAR_FIELD(schedId);
|
||||
COPY_SCALAR_FIELD(execId);
|
||||
|
|
|
@ -5259,6 +5259,7 @@ static int32_t jsonToColumnDefNode(const SJson* pJson, void* pObj) {
|
|||
}
|
||||
|
||||
static const char* jkDownstreamSourceAddr = "Addr";
|
||||
static const char* jkDownstreamSourceClientId = "ClientId";
|
||||
static const char* jkDownstreamSourceTaskId = "TaskId";
|
||||
static const char* jkDownstreamSourceSchedId = "SchedId";
|
||||
static const char* jkDownstreamSourceExecId = "ExecId";
|
||||
|
@ -5268,6 +5269,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
|
|||
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
|
||||
|
||||
int32_t code = tjsonAddObject(pJson, jkDownstreamSourceAddr, queryNodeAddrToJson, &pNode->addr);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceClientId, pNode->clientId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceTaskId, pNode->taskId);
|
||||
}
|
||||
|
@ -5288,6 +5292,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
|
|||
SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)pObj;
|
||||
|
||||
int32_t code = tjsonToObject(pJson, jkDownstreamSourceAddr, jsonToQueryNodeAddr, &pNode->addr);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceClientId, &pNode->clientId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceTaskId, &pNode->taskId);
|
||||
}
|
||||
|
|
|
@ -1769,6 +1769,9 @@ static int32_t downstreamSourceNodeInlineToMsg(const void* pObj, STlvEncoder* pE
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueI32(pEncoder, pNode->fetchMsgType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueU64(pEncoder, pNode->clientId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1793,6 +1796,9 @@ static int32_t msgToDownstreamSourceNodeInlineToMsg(STlvDecoder* pDecoder, void*
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvDecodeValueI32(pDecoder, &pNode->fetchMsgType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && !tlvDecodeEnd(pDecoder)) {
|
||||
code = tlvDecodeValueU64(pDecoder, &pNode->clientId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -246,7 +246,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
|
|||
return code;
|
||||
}
|
||||
|
||||
static int parseTimestampOrInterval(const char** end, SToken* pToken, int16_t timePrec, int64_t* ts, int64_t* interval,
|
||||
static int32_t parseTimestampOrInterval(const char** end, SToken* pToken, int16_t timePrec, int64_t* ts, int64_t* interval,
|
||||
SMsgBuf* pMsgBuf, bool* isTs) {
|
||||
if (pToken->type == TK_NOW) {
|
||||
*isTs = true;
|
||||
|
|
|
@ -215,8 +215,8 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) \
|
||||
(atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST)
|
||||
|
||||
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId
|
||||
#define QW_IDS() sId, qId, tId, rId, eId
|
||||
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId, int32_t eId
|
||||
#define QW_IDS() sId, qId, cId, tId, rId, eId
|
||||
#define QW_FPARAMS() mgmt, QW_IDS()
|
||||
|
||||
#define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n)
|
||||
|
@ -257,18 +257,20 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
|
||||
#define QW_QUERY_NOT_STARTED(ctx) (QW_GET_PHASE(ctx) == -1)
|
||||
|
||||
#define QW_SET_QTID(id, qId, tId, eId) \
|
||||
do { \
|
||||
*(uint64_t *)(id) = (qId); \
|
||||
*(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \
|
||||
*(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)) = (eId); \
|
||||
#define QW_SET_QTID(id, qId, cId, tId, eId) \
|
||||
do { \
|
||||
*(uint64_t *)(id) = (qId); \
|
||||
*(uint64_t *)((char *)(id) + sizeof(qId)) = (cId); \
|
||||
*(uint64_t *)((char *)(id) + sizeof(qId) + sizeof(cId)) = (tId); \
|
||||
*(int32_t *)((char *)(id) + sizeof(qId) + sizeof(cId) + sizeof(tId)) = (eId); \
|
||||
} while (0)
|
||||
|
||||
#define QW_GET_QTID(id, qId, tId, eId) \
|
||||
do { \
|
||||
(qId) = *(uint64_t *)(id); \
|
||||
(tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \
|
||||
(eId) = *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)); \
|
||||
#define QW_GET_QTID(id, qId, cId, tId, eId) \
|
||||
do { \
|
||||
(qId) = *(uint64_t *)(id); \
|
||||
(cId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \
|
||||
(tId) = *(uint64_t *)((char *)(id) + sizeof(qId) + sizeof(cId)); \
|
||||
(eId) = *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(cId) + sizeof(tId)); \
|
||||
} while (0)
|
||||
|
||||
#define QW_ERR_RET(c) \
|
||||
|
@ -310,25 +312,31 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
|
||||
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
|
||||
|
||||
#define QW_TASK_ELOG(param, ...) qError("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
|
||||
#define QW_TASK_WLOG(param, ...) qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
|
||||
#define QW_TASK_DLOG(param, ...) qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
|
||||
#define QW_TASK_ELOG(param, ...) \
|
||||
qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
|
||||
#define QW_TASK_WLOG(param, ...) \
|
||||
qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
|
||||
#define QW_TASK_DLOG(param, ...) \
|
||||
qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
|
||||
#define QW_TASK_DLOGL(param, ...) \
|
||||
qDebugL("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
|
||||
qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
|
||||
|
||||
#define QW_TASK_ELOG_E(param) qError("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
|
||||
#define QW_TASK_WLOG_E(param) qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
|
||||
#define QW_TASK_DLOG_E(param) qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
|
||||
#define QW_TASK_ELOG_E(param) \
|
||||
qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
|
||||
#define QW_TASK_WLOG_E(param) \
|
||||
qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
|
||||
#define QW_TASK_DLOG_E(param) \
|
||||
qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
|
||||
|
||||
#define QW_SCH_TASK_ELOG(param, ...) \
|
||||
qError("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \
|
||||
__VA_ARGS__)
|
||||
#define QW_SCH_TASK_WLOG(param, ...) \
|
||||
qWarn("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \
|
||||
__VA_ARGS__)
|
||||
#define QW_SCH_TASK_DLOG(param, ...) \
|
||||
qDebug("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \
|
||||
__VA_ARGS__)
|
||||
#define QW_SCH_TASK_ELOG(param, ...) \
|
||||
qError("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
|
||||
qId, cId, tId, eId, __VA_ARGS__)
|
||||
#define QW_SCH_TASK_WLOG(param, ...) \
|
||||
qWarn("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \
|
||||
cId, tId, eId, __VA_ARGS__)
|
||||
#define QW_SCH_TASK_DLOG(param, ...) \
|
||||
qDebug("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
|
||||
qId, cId, tId, eId, __VA_ARGS__)
|
||||
|
||||
#define QW_LOCK_DEBUG(...) \
|
||||
do { \
|
||||
|
|
|
@ -96,14 +96,14 @@ void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) {
|
|||
int32_t taskNum = taosHashGetSize(sch->tasksHash);
|
||||
QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum);
|
||||
|
||||
uint64_t qId, tId;
|
||||
uint64_t qId, cId, tId;
|
||||
int32_t eId;
|
||||
SQWTaskStatus *pTask = NULL;
|
||||
void *pIter = taosHashIterate(sch->tasksHash, NULL);
|
||||
while (pIter) {
|
||||
pTask = (SQWTaskStatus *)pIter;
|
||||
void *key = taosHashGetKey(pIter, NULL);
|
||||
QW_GET_QTID(key, qId, tId, eId);
|
||||
QW_GET_QTID(key, qId, cId, tId, eId);
|
||||
|
||||
QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status);
|
||||
|
||||
|
@ -118,13 +118,13 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) {
|
|||
|
||||
int32_t i = 0;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
uint64_t qId, tId;
|
||||
uint64_t qId, cId, tId;
|
||||
int32_t eId;
|
||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||
while (pIter) {
|
||||
ctx = (SQWTaskCtx *)pIter;
|
||||
void *key = taosHashGetKey(pIter, NULL);
|
||||
QW_GET_QTID(key, qId, tId, eId);
|
||||
QW_GET_QTID(key, qId, cId, tId, eId);
|
||||
|
||||
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, queryMsgType:%d, "
|
||||
"sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
|
||||
|
|
|
@ -233,6 +233,7 @@ int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
|
|||
qMsg.header.contLen = 0;
|
||||
qMsg.sId = sId;
|
||||
qMsg.queryId = qId;
|
||||
qMsg.clientId = cId;
|
||||
qMsg.taskId = tId;
|
||||
qMsg.refId = rId;
|
||||
qMsg.execId = eId;
|
||||
|
@ -284,6 +285,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
|
|||
req->header.vgId = mgmt->nodeId;
|
||||
req->sId = sId;
|
||||
req->queryId = qId;
|
||||
req->clientId = cId;
|
||||
req->taskId = tId;
|
||||
req->execId = eId;
|
||||
|
||||
|
@ -312,6 +314,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
|
|||
qMsg.header.contLen = 0;
|
||||
qMsg.sId = sId;
|
||||
qMsg.queryId = qId;
|
||||
qMsg.clientId = cId;
|
||||
qMsg.taskId = tId;
|
||||
qMsg.refId = rId;
|
||||
qMsg.execId = eId;
|
||||
|
@ -416,6 +419,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
|
|||
|
||||
uint64_t sId = msg.sId;
|
||||
uint64_t qId = msg.queryId;
|
||||
uint64_t cId = msg.clientId;
|
||||
uint64_t tId = msg.taskId;
|
||||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
@ -447,6 +451,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
uint64_t sId = msg.sId;
|
||||
uint64_t qId = msg.queryId;
|
||||
uint64_t cId = msg.clientId;
|
||||
uint64_t tId = msg.taskId;
|
||||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
@ -479,6 +484,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
|
||||
uint64_t sId = msg.sId;
|
||||
uint64_t qId = msg.queryId;
|
||||
uint64_t cId = msg.clientId;
|
||||
uint64_t tId = msg.taskId;
|
||||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
@ -524,6 +530,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
|||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t cId = msg->clientId;
|
||||
uint64_t tId = msg->taskId;
|
||||
int64_t rId = 0;
|
||||
int32_t eId = msg->execId;
|
||||
|
@ -557,6 +564,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
|
||||
uint64_t sId = req.sId;
|
||||
uint64_t qId = req.queryId;
|
||||
uint64_t cId = req.clientId;
|
||||
uint64_t tId = req.taskId;
|
||||
int64_t rId = 0;
|
||||
int32_t eId = req.execId;
|
||||
|
@ -604,12 +612,14 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
|||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->clientId = be64toh(msg->clientId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
msg->refId = be64toh(msg->refId);
|
||||
msg->execId = ntohl(msg->execId);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t cId = msg->clientId;
|
||||
uint64_t tId = msg->taskId;
|
||||
int64_t rId = msg->refId;
|
||||
int32_t eId = msg->execId;
|
||||
|
@ -646,6 +656,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
|
|||
|
||||
uint64_t sId = msg.sId;
|
||||
uint64_t qId = msg.queryId;
|
||||
uint64_t cId = msg.clientId;
|
||||
uint64_t tId = msg.taskId;
|
||||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
@ -684,6 +695,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
|||
|
||||
uint64_t sId = msg.sId;
|
||||
uint64_t qId = msg.queryId;
|
||||
uint64_t cId = msg.clientId;
|
||||
uint64_t tId = msg.taskId;
|
||||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
@ -753,6 +765,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
|
|||
|
||||
uint64_t sId = req.sId;
|
||||
uint64_t qId = req.queryId;
|
||||
uint64_t cId = req.clientId;
|
||||
uint64_t tId = req.taskId;
|
||||
int64_t rId = 0;
|
||||
int32_t eId = -1;
|
||||
|
|
|
@ -137,8 +137,8 @@ int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchS
|
|||
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
|
||||
|
||||
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
|
||||
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId, eId);
|
||||
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, cId, tId, eId);
|
||||
|
||||
QW_LOCK(rwType, &sch->tasksLock);
|
||||
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
|
||||
|
@ -153,8 +153,8 @@ int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, S
|
|||
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
|
||||
int32_t code = 0;
|
||||
|
||||
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId, eId);
|
||||
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, cId, tId, eId);
|
||||
|
||||
SQWTaskStatus ntask = {0};
|
||||
ntask.status = status;
|
||||
|
@ -209,8 +209,8 @@ int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch
|
|||
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); }
|
||||
|
||||
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId, eId);
|
||||
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, cId, tId, eId);
|
||||
|
||||
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
|
||||
if (NULL == (*ctx)) {
|
||||
|
@ -222,8 +222,8 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
|||
}
|
||||
|
||||
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId, eId);
|
||||
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, cId, tId, eId);
|
||||
|
||||
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||
if (NULL == (*ctx)) {
|
||||
|
@ -235,8 +235,8 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
|||
}
|
||||
|
||||
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
|
||||
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId, eId);
|
||||
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, cId, tId, eId);
|
||||
|
||||
SQWTaskCtx nctx = {0};
|
||||
|
||||
|
@ -347,6 +347,7 @@ int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
|||
(void)memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
|
||||
localRsp.rsp.subplanInfo = pExec;
|
||||
localRsp.qId = qId;
|
||||
localRsp.cId = cId;
|
||||
localRsp.tId = tId;
|
||||
localRsp.rId = rId;
|
||||
localRsp.eId = eId;
|
||||
|
@ -376,8 +377,8 @@ _return:
|
|||
|
||||
|
||||
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
|
||||
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId, eId);
|
||||
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, cId, tId, eId);
|
||||
SQWTaskCtx octx;
|
||||
|
||||
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||
|
@ -411,8 +412,8 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
|
|||
SQWTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId, eId);
|
||||
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, cId, tId, eId);
|
||||
|
||||
if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
|
||||
QW_TASK_WLOG_E("scheduler does not exist");
|
||||
|
@ -465,8 +466,8 @@ _return:
|
|||
|
||||
|
||||
int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) {
|
||||
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId, eId);
|
||||
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
|
||||
QW_SET_QTID(id, qId, cId, tId, eId);
|
||||
SQWTaskCtx octx;
|
||||
|
||||
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||
|
@ -588,14 +589,14 @@ void qwDestroyImpl(void *pMgmt) {
|
|||
mgmt->hbTimer = NULL;
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
uint64_t qId, tId;
|
||||
uint64_t qId, cId, tId;
|
||||
int32_t eId;
|
||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||
|
||||
while (pIter) {
|
||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||
void *key = taosHashGetKey(pIter, NULL);
|
||||
QW_GET_QTID(key, qId, tId, eId);
|
||||
QW_GET_QTID(key, qId, cId, tId, eId);
|
||||
|
||||
qwFreeTaskCtx(ctx);
|
||||
QW_TASK_DLOG_E("task ctx freed");
|
||||
|
|
|
@ -19,7 +19,7 @@ SQWorkerMgmt gQwMgmt = {
|
|||
};
|
||||
|
||||
void qwStopAllTasks(SQWorker *mgmt) {
|
||||
uint64_t qId, tId, sId;
|
||||
uint64_t qId, cId, tId, sId;
|
||||
int32_t eId;
|
||||
int64_t rId = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -28,7 +28,7 @@ void qwStopAllTasks(SQWorker *mgmt) {
|
|||
while (pIter) {
|
||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||
void *key = taosHashGetKey(pIter, NULL);
|
||||
QW_GET_QTID(key, qId, tId, eId);
|
||||
QW_GET_QTID(key, qId, cId, tId, eId);
|
||||
|
||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
|
@ -288,7 +288,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
|
|||
|
||||
// TODO GET EXECUTOR API TO GET MORE INFO
|
||||
|
||||
QW_GET_QTID(key, status.queryId, status.taskId, status.execId);
|
||||
QW_GET_QTID(key, status.queryId, status.clientId, status.taskId, status.execId);
|
||||
status.status = taskStatus->status;
|
||||
status.refId = taskStatus->refId;
|
||||
|
||||
|
@ -1473,8 +1473,8 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId,
|
||||
SQWMsg *qwMsg, SArray *explainRes) {
|
||||
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId,
|
||||
int32_t eId, SQWMsg *qwMsg, SArray *explainRes) {
|
||||
SQWorker *mgmt = (SQWorker *)pMgmt;
|
||||
int32_t code = 0;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
|
@ -1538,8 +1538,8 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId,
|
||||
void **pRsp, SArray *explainRes) {
|
||||
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t cId, uint64_t tId, int64_t rId,
|
||||
int32_t eId, void **pRsp, SArray *explainRes) {
|
||||
SQWorker *mgmt = (SQWorker *)pMgmt;
|
||||
int32_t code = 0;
|
||||
int32_t dataLen = 0;
|
||||
|
|
|
@ -142,8 +142,9 @@ typedef struct SSchedulerCfg {
|
|||
} SSchedulerCfg;
|
||||
|
||||
typedef struct SSchedulerMgmt {
|
||||
uint64_t taskId; // sequential taksId
|
||||
uint64_t sId; // schedulerId
|
||||
uint64_t clientId; // unique clientId
|
||||
uint64_t taskId; // sequential taksId
|
||||
uint64_t sId; // schedulerId
|
||||
SSchedulerCfg cfg;
|
||||
bool exit;
|
||||
int32_t jobRef;
|
||||
|
@ -163,6 +164,7 @@ typedef struct SSchTaskCallbackParam {
|
|||
SSchCallbackParamHeader head;
|
||||
uint64_t queryId;
|
||||
int64_t refId;
|
||||
uint64_t clientId;
|
||||
uint64_t taskId;
|
||||
int32_t execId;
|
||||
void *pTrans;
|
||||
|
@ -222,6 +224,7 @@ typedef struct SSchTimerParam {
|
|||
} SSchTimerParam;
|
||||
|
||||
typedef struct SSchTask {
|
||||
uint64_t clientId; // current client id
|
||||
uint64_t taskId; // task id
|
||||
SRWLatch lock; // task reentrant lock
|
||||
int32_t maxExecTimes; // task max exec times
|
||||
|
@ -329,6 +332,7 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock)
|
||||
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)
|
||||
|
||||
#define SCH_CLIENT_ID(_task) ((_task) ? (_task)->clientId : -1)
|
||||
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
|
||||
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
|
||||
|
||||
|
@ -449,21 +453,21 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_JOB_ELOG(param, ...) qError("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
|
||||
#define SCH_JOB_DLOG(param, ...) qDebug("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
|
||||
|
||||
#define SCH_TASK_ELOG(param, ...) \
|
||||
qError("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
|
||||
__VA_ARGS__)
|
||||
#define SCH_TASK_DLOG(param, ...) \
|
||||
qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
|
||||
__VA_ARGS__)
|
||||
#define SCH_TASK_TLOG(param, ...) \
|
||||
qTrace("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
|
||||
__VA_ARGS__)
|
||||
#define SCH_TASK_DLOGL(param, ...) \
|
||||
qDebugL("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
|
||||
__VA_ARGS__)
|
||||
#define SCH_TASK_WLOG(param, ...) \
|
||||
qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
|
||||
__VA_ARGS__)
|
||||
#define SCH_TASK_ELOG(param, ...) \
|
||||
qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
|
||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||
#define SCH_TASK_DLOG(param, ...) \
|
||||
qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
|
||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||
#define SCH_TASK_TLOG(param, ...) \
|
||||
qTrace("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
|
||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||
#define SCH_TASK_DLOGL(param, ...) \
|
||||
qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
|
||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||
#define SCH_TASK_WLOG(param, ...) \
|
||||
qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
|
||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||
|
||||
#define SCH_SET_ERRNO(_err) \
|
||||
do { \
|
||||
|
|
|
@ -500,8 +500,8 @@ _return:
|
|||
|
||||
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId,
|
||||
code);
|
||||
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId,
|
||||
pParam->clientId, pParam->taskId, code);
|
||||
// called if drop task rsp received code
|
||||
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
|
||||
|
||||
|
@ -517,8 +517,8 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
|||
|
||||
int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId,
|
||||
code);
|
||||
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId,
|
||||
pParam->clientId, pParam->taskId, code);
|
||||
if (pMsg) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
|
@ -595,6 +595,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo
|
|||
|
||||
param->queryId = pJob->queryId;
|
||||
param->refId = pJob->refId;
|
||||
param->clientId = SCH_CLIENT_ID(pTask);
|
||||
param->taskId = SCH_TASK_ID(pTask);
|
||||
param->pTrans = pJob->conn.pTrans;
|
||||
param->execId = pTask->execId;
|
||||
|
@ -1138,6 +1139,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
req.header.vgId = addr->nodeId;
|
||||
req.sId = schMgmt.sId;
|
||||
req.queryId = pJob->queryId;
|
||||
req.clientId = pTask->clientId;
|
||||
req.taskId = pTask->taskId;
|
||||
req.phyLen = pTask->msgLen;
|
||||
req.sqlLen = strlen(pJob->sql);
|
||||
|
@ -1171,6 +1173,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
qMsg.header.contLen = 0;
|
||||
qMsg.sId = schMgmt.sId;
|
||||
qMsg.queryId = pJob->queryId;
|
||||
qMsg.clientId = pTask->clientId;
|
||||
qMsg.taskId = pTask->taskId;
|
||||
qMsg.refId = pJob->refId;
|
||||
qMsg.execId = pTask->execId;
|
||||
|
@ -1226,6 +1229,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
req.header.vgId = addr->nodeId;
|
||||
req.sId = schMgmt.sId;
|
||||
req.queryId = pJob->queryId;
|
||||
req.clientId = pTask->clientId;
|
||||
req.taskId = pTask->taskId;
|
||||
req.execId = pTask->execId;
|
||||
|
||||
|
@ -1253,6 +1257,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
qMsg.header.contLen = 0;
|
||||
qMsg.sId = schMgmt.sId;
|
||||
qMsg.queryId = pJob->queryId;
|
||||
qMsg.clientId = pTask->clientId;
|
||||
qMsg.taskId = pTask->taskId;
|
||||
qMsg.refId = pJob->refId;
|
||||
qMsg.execId = *(int32_t*)param;
|
||||
|
@ -1310,6 +1315,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
qMsg.header.contLen = 0;
|
||||
qMsg.sId = schMgmt.sId;
|
||||
qMsg.queryId = pJob->queryId;
|
||||
qMsg.clientId = pTask->clientId;
|
||||
qMsg.taskId = pTask->taskId;
|
||||
qMsg.refId = pJob->refId;
|
||||
qMsg.execId = pTask->execId;
|
||||
|
|
|
@ -66,6 +66,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
|
|||
pTask->execId = -1;
|
||||
pTask->failedExecId = -2;
|
||||
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
|
||||
pTask->clientId = getClientId();
|
||||
pTask->taskId = schGenTaskId();
|
||||
|
||||
schInitTaskRetryTimes(pJob, pTask, pLevel);
|
||||
|
@ -305,6 +306,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
SCH_LOCK(SCH_WRITE, &parent->planLock);
|
||||
SDownstreamSourceNode source = {
|
||||
.type = QUERY_NODE_DOWNSTREAM_SOURCE,
|
||||
.clientId = pTask->clientId,
|
||||
.taskId = pTask->taskId,
|
||||
.schedId = schMgmt.sId,
|
||||
.execId = pTask->execId,
|
||||
|
@ -996,8 +998,8 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
|
|||
|
||||
int32_t code = 0;
|
||||
|
||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
|
||||
pStatus->execId, jobTaskStatusStr(pStatus->status));
|
||||
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId,
|
||||
pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
|
||||
|
||||
if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
|
||||
continue;
|
||||
|
@ -1043,13 +1045,14 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
|
|||
continue;
|
||||
}
|
||||
|
||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
|
||||
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
|
||||
localRsp->qId, localRsp->cId, localRsp->tId);
|
||||
|
||||
pJob = NULL;
|
||||
(void)schAcquireJob(localRsp->rId, &pJob);
|
||||
if (NULL == pJob) {
|
||||
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
|
||||
localRsp->tId, localRsp->rId);
|
||||
qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64,
|
||||
localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
|
||||
}
|
||||
|
||||
|
@ -1068,8 +1071,8 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
|
|||
|
||||
(void)schReleaseJob(pJob->refId);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
|
||||
localRsp->tId, code);
|
||||
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
|
||||
localRsp->qId, localRsp->cId, localRsp->tId, code);
|
||||
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
|
@ -1147,8 +1150,8 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
pTask->execId, &qwMsg, explainRes));
|
||||
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId,
|
||||
pJob->refId, pTask->execId, &qwMsg, explainRes));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_RET(schHandleExplainRes(explainRes));
|
||||
|
@ -1407,8 +1410,8 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
pTask->execId, &pRsp, explainRes));
|
||||
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId,
|
||||
pJob->refId, pTask->execId, &pRsp, explainRes));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_RET(schHandleExplainRes(explainRes));
|
||||
|
|
|
@ -293,6 +293,18 @@ void schCloseJobRef(void) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t initClientId(void) {
|
||||
int32_t code = taosGetSystemUUIDU64(&schMgmt.clientId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to generate clientId since %s", tstrerror(code));
|
||||
SCH_ERR_RET(code);
|
||||
}
|
||||
qInfo("initialize");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
uint64_t getClientId(void) { return schMgmt.clientId; }
|
||||
|
||||
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
|
|
|
@ -81,6 +81,7 @@ static const char *am_pm[2] = {"AM", "PM"};
|
|||
#endif
|
||||
|
||||
char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm) {
|
||||
if (!buf || !fmt || !tm) return NULL;
|
||||
#ifdef WINDOWS
|
||||
char c;
|
||||
const char *bp;
|
||||
|
@ -345,6 +346,9 @@ char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm) {
|
|||
}
|
||||
|
||||
int32_t taosGetTimeOfDay(struct timeval *tv) {
|
||||
if (tv == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
int32_t code = 0;
|
||||
#ifdef WINDOWS
|
||||
LARGE_INTEGER t;
|
||||
|
@ -365,12 +369,15 @@ int32_t taosGetTimeOfDay(struct timeval *tv) {
|
|||
#endif
|
||||
}
|
||||
|
||||
time_t taosTime(time_t *t) {
|
||||
int32_t taosTime(time_t *t) {
|
||||
if (t == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
time_t r = time(t);
|
||||
if (r == (time_t)-1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
return r;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -154,16 +154,26 @@ static int32_t taosStartLog() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void getDay(char *buf, int32_t bufSize) {
|
||||
time_t t = taosTime(NULL);
|
||||
static int32_t getDay(char *buf, int32_t bufSize) {
|
||||
time_t t;
|
||||
int32_t code = taosTime(&t);
|
||||
if(code != 0) {
|
||||
return code;
|
||||
}
|
||||
struct tm tmInfo;
|
||||
if (taosLocalTime(&t, &tmInfo, buf, bufSize) != NULL) {
|
||||
TAOS_UNUSED(strftime(buf, bufSize, "%Y-%m-%d", &tmInfo));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int64_t getTimestampToday() {
|
||||
time_t t = taosTime(NULL);
|
||||
time_t t;
|
||||
int32_t code = taosTime(&t);
|
||||
if (code != 0) {
|
||||
uError("failed to get time, reason:%s", tstrerror(code));
|
||||
return 0;
|
||||
}
|
||||
struct tm tm;
|
||||
if (taosLocalTime(&t, &tm, NULL, 0) == NULL) {
|
||||
return 0;
|
||||
|
@ -203,7 +213,11 @@ int32_t taosInitSlowLog() {
|
|||
|
||||
char name[PATH_MAX + TD_TIME_STR_LEN] = {0};
|
||||
char day[TD_TIME_STR_LEN] = {0};
|
||||
getDay(day, sizeof(day));
|
||||
int32_t code = getDay(day, sizeof(day));
|
||||
if (code != 0) {
|
||||
(void)printf("failed to get day, reason:%s\n", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
(void)snprintf(name, PATH_MAX + TD_TIME_STR_LEN, "%s.%s", tsLogObj.slowLogName, day);
|
||||
|
||||
tsLogObj.timestampToday = getTimestampToday();
|
||||
|
@ -434,7 +448,12 @@ static void taosOpenNewSlowLogFile() {
|
|||
atomic_store_32(&tsLogObj.slowHandle->lock, 0);
|
||||
|
||||
char day[TD_TIME_STR_LEN] = {0};
|
||||
getDay(day, sizeof(day));
|
||||
int32_t code = getDay(day, sizeof(day));
|
||||
if (code != 0) {
|
||||
uError("failed to get day, reason:%s", tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&tsLogObj.logMutex);
|
||||
return;
|
||||
}
|
||||
TdFilePtr pFile = NULL;
|
||||
char name[PATH_MAX + TD_TIME_STR_LEN] = {0};
|
||||
(void)snprintf(name, PATH_MAX + TD_TIME_STR_LEN, "%s.%s", tsLogObj.slowLogName, day);
|
||||
|
|
|
@ -0,0 +1,406 @@
|
|||
// sample code to verify multiple queries with the same reqid
|
||||
// to compile: gcc -o sameReqdiTest sameReqidTest.c -ltaos
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include "taos.h"
|
||||
|
||||
#define NUM_ROUNDS 10
|
||||
#define CONST_REQ_ID 12345
|
||||
#define TEST_DB "test"
|
||||
|
||||
#define CHECK_CONDITION(condition, prompt, errstr) \
|
||||
do { \
|
||||
if (!(condition)) { \
|
||||
printf("\033[31m[%s:%d] failed to " prompt ", reason: %s\033[0m\n", __func__, __LINE__, errstr); \
|
||||
exit(EXIT_FAILURE); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CHECK_RES(res, prompt) CHECK_CONDITION(taos_errno(res) == 0, prompt, taos_errstr(res))
|
||||
#define CHECK_CODE(code, prompt) CHECK_CONDITION(code == 0, prompt, taos_errstr(NULL))
|
||||
|
||||
static TAOS* getNewConnection() {
|
||||
const char* host = "127.0.0.1";
|
||||
const char* user = "root";
|
||||
const char* passwd = "taosdata";
|
||||
TAOS* taos = NULL;
|
||||
|
||||
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
|
||||
taos = taos_connect(host, user, passwd, "", 0);
|
||||
CHECK_CONDITION(taos != NULL, "connect to db", taos_errstr(NULL));
|
||||
return taos;
|
||||
}
|
||||
|
||||
static void prepareData(TAOS* taos) {
|
||||
TAOS_RES* res = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
res = taos_query(taos, "create database if not exists " TEST_DB " precision 'ns'");
|
||||
CHECK_RES(res, "create database");
|
||||
taos_free_result(res);
|
||||
usleep(100000);
|
||||
|
||||
code = taos_select_db(taos, TEST_DB);
|
||||
CHECK_CODE(code, "switch to database");
|
||||
|
||||
res = taos_query(taos, "create table if not exists meters(ts timestamp, a int) tags(area int)");
|
||||
CHECK_RES(res, "create stable meters");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t0 using meters tags(0)");
|
||||
CHECK_RES(res, "create table t0");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t1 using meters tags(1)");
|
||||
CHECK_RES(res, "create table t1");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t2 using meters tags(2)");
|
||||
CHECK_RES(res, "create table t2");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t3 using meters tags(3)");
|
||||
CHECK_RES(res, "create table t3");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t4 using meters tags(4)");
|
||||
CHECK_RES(res, "create table t4");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t5 using meters tags(5)");
|
||||
CHECK_RES(res, "create table t5");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t6 using meters tags(6)");
|
||||
CHECK_RES(res, "create table t6");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t7 using meters tags(7)");
|
||||
CHECK_RES(res, "create table t7");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t8 using meters tags(8)");
|
||||
CHECK_RES(res, "create table t8");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "create table if not exists t9 using meters tags(9)");
|
||||
CHECK_RES(res, "create table t9");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos,
|
||||
"insert into t0 values('2020-01-01 00:00:00.000', 0)"
|
||||
" ('2020-01-01 00:01:00.000', 0)"
|
||||
" ('2020-01-01 00:02:00.000', 0)"
|
||||
" t1 values('2020-01-01 00:00:00.000', 1)"
|
||||
" ('2020-01-01 00:01:00.000', 1)"
|
||||
" ('2020-01-01 00:02:00.000', 1)"
|
||||
" ('2020-01-01 00:03:00.000', 1)"
|
||||
" t2 values('2020-01-01 00:00:00.000', 2)"
|
||||
" ('2020-01-01 00:01:00.000', 2)"
|
||||
" ('2020-01-01 00:01:01.000', 2)"
|
||||
" ('2020-01-01 00:01:02.000', 2)"
|
||||
" t3 values('2020-01-01 00:01:02.000', 3)"
|
||||
" t4 values('2020-01-01 00:01:02.000', 4)"
|
||||
" t5 values('2020-01-01 00:01:02.000', 5)"
|
||||
" t6 values('2020-01-01 00:01:02.000', 6)"
|
||||
" t7 values('2020-01-01 00:01:02.000', 7)"
|
||||
" t8 values('2020-01-01 00:01:02.000', 8)"
|
||||
" t9 values('2020-01-01 00:01:02.000', 9)");
|
||||
CHECK_RES(res, "insert into meters");
|
||||
CHECK_CONDITION(taos_affected_rows(res), "insert into meters", "insufficient count");
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(
|
||||
taos,
|
||||
"create table if not exists m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 "
|
||||
"double, bin binary(40), blob nchar(10))");
|
||||
CHECK_RES(res, "create table m1");
|
||||
taos_free_result(res);
|
||||
|
||||
usleep(1000000);
|
||||
}
|
||||
|
||||
static void verifySchemaLess(TAOS* taos) {
|
||||
TAOS_RES* res = NULL;
|
||||
char* lines[] = {
|
||||
"st,t1=3i64,t2=4f64,t3=L\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
|
||||
"st,t1=4i64,t3=L\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000",
|
||||
"st,t2=5f64,t3=L\"ste\" c1=4i64,c2=true,c3=L\"iam\" 1626056811823316532",
|
||||
"st,t1=4i64,t2=5f64,t3=L\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000",
|
||||
"st,t2=5f64,t3=L\"ste2\" c3=L\"iamszhou\",c2=false 1626056811843316532",
|
||||
"st,t2=5f64,t3=L\"ste2\" c3=L\"iamszhou\",c2=false,c5=5f64,c6=7u64,c7=32i32,c8=88.88f32 1626056812843316532",
|
||||
"st,t1=4i64,t3=L\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 "
|
||||
"1626006933640000000",
|
||||
"st,t1=4i64,t3=L\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 "
|
||||
"1626006933640000000",
|
||||
"st,t1=4i64,t3=L\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 "
|
||||
"1626006933641000000"};
|
||||
|
||||
res = taos_schemaless_insert_with_reqid(taos, lines, sizeof(lines) / sizeof(char*), TSDB_SML_LINE_PROTOCOL,
|
||||
TSDB_SML_TIMESTAMP_NANO_SECONDS, CONST_REQ_ID);
|
||||
CHECK_RES(res, "insert schema-less data");
|
||||
printf("successfully inserted %d rows\n", taos_affected_rows(res));
|
||||
taos_free_result(res);
|
||||
}
|
||||
|
||||
static int32_t printResult(TAOS_RES* res, int32_t nlimit) {
|
||||
TAOS_ROW row = NULL;
|
||||
TAOS_FIELD* fields = NULL;
|
||||
int32_t numFields = 0;
|
||||
int32_t nRows = 0;
|
||||
|
||||
numFields = taos_num_fields(res);
|
||||
fields = taos_fetch_fields(res);
|
||||
while ((nlimit-- > 0) && (row = taos_fetch_row(res))) {
|
||||
char temp[256] = {0};
|
||||
taos_print_row(temp, row, fields, numFields);
|
||||
puts(temp);
|
||||
nRows++;
|
||||
}
|
||||
return nRows;
|
||||
}
|
||||
|
||||
static void verifyQuery(TAOS* taos) {
|
||||
TAOS_RES* res = NULL;
|
||||
|
||||
res = taos_query_with_reqid(taos, "select * from meters", CONST_REQ_ID);
|
||||
CHECK_RES(res, "select from meters");
|
||||
printResult(res, INT32_MAX);
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query_with_reqid(taos, "select * from t0", CONST_REQ_ID);
|
||||
CHECK_RES(res, "select from t0");
|
||||
printResult(res, INT32_MAX);
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query_with_reqid(taos, "select * from t1", CONST_REQ_ID);
|
||||
CHECK_RES(res, "select from t1");
|
||||
printResult(res, INT32_MAX);
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query_with_reqid(taos, "select * from t2", CONST_REQ_ID);
|
||||
CHECK_RES(res, "select from t2");
|
||||
printResult(res, INT32_MAX);
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query_with_reqid(taos, "select * from t3", CONST_REQ_ID);
|
||||
CHECK_RES(res, "select from t3");
|
||||
printResult(res, INT32_MAX);
|
||||
taos_free_result(res);
|
||||
|
||||
printf("succeed to read from meters\n");
|
||||
}
|
||||
|
||||
void retrieveCallback(void* param, TAOS_RES* res, int32_t nrows) {
|
||||
if (nrows == 0) {
|
||||
taos_free_result(res);
|
||||
} else {
|
||||
printResult(res, nrows);
|
||||
taos_fetch_rows_a(res, retrieveCallback, param);
|
||||
}
|
||||
}
|
||||
|
||||
void selectCallback(void* param, TAOS_RES* res, int32_t code) {
|
||||
CHECK_CODE(code, "read async from table");
|
||||
taos_fetch_rows_a(res, retrieveCallback, param);
|
||||
}
|
||||
|
||||
static void verifyQueryAsync(TAOS* taos) {
|
||||
taos_query_a_with_reqid(taos, "select *from meters", selectCallback, NULL, CONST_REQ_ID);
|
||||
taos_query_a_with_reqid(taos, "select *from t0", selectCallback, NULL, CONST_REQ_ID);
|
||||
taos_query_a_with_reqid(taos, "select *from t1", selectCallback, NULL, CONST_REQ_ID);
|
||||
taos_query_a_with_reqid(taos, "select *from t2", selectCallback, NULL, CONST_REQ_ID);
|
||||
taos_query_a_with_reqid(taos, "select *from t3", selectCallback, NULL, CONST_REQ_ID);
|
||||
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
void veriryStmt(TAOS* taos) {
|
||||
// insert 10 records
|
||||
struct {
|
||||
int64_t ts[10];
|
||||
int8_t b[10];
|
||||
int8_t v1[10];
|
||||
int16_t v2[10];
|
||||
int32_t v4[10];
|
||||
int64_t v8[10];
|
||||
float f4[10];
|
||||
double f8[10];
|
||||
char bin[10][40];
|
||||
char blob[10][80];
|
||||
} v;
|
||||
|
||||
int32_t* t8_len = malloc(sizeof(int32_t) * 10);
|
||||
int32_t* t16_len = malloc(sizeof(int32_t) * 10);
|
||||
int32_t* t32_len = malloc(sizeof(int32_t) * 10);
|
||||
int32_t* t64_len = malloc(sizeof(int32_t) * 10);
|
||||
int32_t* float_len = malloc(sizeof(int32_t) * 10);
|
||||
int32_t* double_len = malloc(sizeof(int32_t) * 10);
|
||||
int32_t* bin_len = malloc(sizeof(int32_t) * 10);
|
||||
int32_t* blob_len = malloc(sizeof(int32_t) * 10);
|
||||
|
||||
TAOS_STMT* stmt = taos_stmt_init_with_reqid(taos, CONST_REQ_ID);
|
||||
TAOS_MULTI_BIND params[10];
|
||||
char is_null[10] = {0};
|
||||
|
||||
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
params[0].buffer_length = sizeof(v.ts[0]);
|
||||
params[0].buffer = v.ts;
|
||||
params[0].length = t64_len;
|
||||
params[0].is_null = is_null;
|
||||
params[0].num = 10;
|
||||
|
||||
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
|
||||
params[1].buffer_length = sizeof(v.b[0]);
|
||||
params[1].buffer = v.b;
|
||||
params[1].length = t8_len;
|
||||
params[1].is_null = is_null;
|
||||
params[1].num = 10;
|
||||
|
||||
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
|
||||
params[2].buffer_length = sizeof(v.v1[0]);
|
||||
params[2].buffer = v.v1;
|
||||
params[2].length = t8_len;
|
||||
params[2].is_null = is_null;
|
||||
params[2].num = 10;
|
||||
|
||||
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
|
||||
params[3].buffer_length = sizeof(v.v2[0]);
|
||||
params[3].buffer = v.v2;
|
||||
params[3].length = t16_len;
|
||||
params[3].is_null = is_null;
|
||||
params[3].num = 10;
|
||||
|
||||
params[4].buffer_type = TSDB_DATA_TYPE_INT;
|
||||
params[4].buffer_length = sizeof(v.v4[0]);
|
||||
params[4].buffer = v.v4;
|
||||
params[4].length = t32_len;
|
||||
params[4].is_null = is_null;
|
||||
params[4].num = 10;
|
||||
|
||||
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
|
||||
params[5].buffer_length = sizeof(v.v8[0]);
|
||||
params[5].buffer = v.v8;
|
||||
params[5].length = t64_len;
|
||||
params[5].is_null = is_null;
|
||||
params[5].num = 10;
|
||||
|
||||
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||
params[6].buffer_length = sizeof(v.f4[0]);
|
||||
params[6].buffer = v.f4;
|
||||
params[6].length = float_len;
|
||||
params[6].is_null = is_null;
|
||||
params[6].num = 10;
|
||||
|
||||
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
|
||||
params[7].buffer_length = sizeof(v.f8[0]);
|
||||
params[7].buffer = v.f8;
|
||||
params[7].length = double_len;
|
||||
params[7].is_null = is_null;
|
||||
params[7].num = 10;
|
||||
|
||||
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||
params[8].buffer_length = sizeof(v.bin[0]);
|
||||
params[8].buffer = v.bin;
|
||||
params[8].length = bin_len;
|
||||
params[8].is_null = is_null;
|
||||
params[8].num = 10;
|
||||
|
||||
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
|
||||
params[9].buffer_length = sizeof(v.blob[0]);
|
||||
params[9].buffer = v.blob;
|
||||
params[9].length = blob_len;
|
||||
params[9].is_null = is_null;
|
||||
params[9].num = 10;
|
||||
|
||||
int32_t code = taos_stmt_prepare(
|
||||
stmt, "insert into ? (ts, b, v1, v2, v4, v8, f4, f8, bin, blob) values(?,?,?,?,?,?,?,?,?,?)", 0);
|
||||
CHECK_CODE(code, "taos_stmt_prepare");
|
||||
|
||||
code = taos_stmt_set_tbname(stmt, "m1");
|
||||
CHECK_CODE(code, "taos_stmt_set_tbname");
|
||||
|
||||
int64_t ts = 1591060628000000000;
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
v.ts[i] = ts;
|
||||
ts += 1000000;
|
||||
is_null[i] = 0;
|
||||
|
||||
v.b[i] = (int8_t)i % 2;
|
||||
v.v1[i] = (int8_t)i;
|
||||
v.v2[i] = (int16_t)(i * 2);
|
||||
v.v4[i] = (int32_t)(i * 4);
|
||||
v.v8[i] = (int64_t)(i * 8);
|
||||
v.f4[i] = (float)(i * 40);
|
||||
v.f8[i] = (double)(i * 80);
|
||||
for (int j = 0; j < sizeof(v.bin[0]); ++j) {
|
||||
v.bin[i][j] = (char)(i + '0');
|
||||
}
|
||||
strcpy(v.blob[i], "一二三四五六七八九十");
|
||||
|
||||
t8_len[i] = sizeof(int8_t);
|
||||
t16_len[i] = sizeof(int16_t);
|
||||
t32_len[i] = sizeof(int32_t);
|
||||
t64_len[i] = sizeof(int64_t);
|
||||
float_len[i] = sizeof(float);
|
||||
double_len[i] = sizeof(double);
|
||||
bin_len[i] = sizeof(v.bin[0]);
|
||||
blob_len[i] = (int32_t)strlen(v.blob[i]);
|
||||
}
|
||||
|
||||
code = taos_stmt_bind_param_batch(stmt, params);
|
||||
CHECK_CODE(code, "taos_stmt_bind_param_batch");
|
||||
|
||||
code = taos_stmt_add_batch(stmt);
|
||||
CHECK_CODE(code, "taos_stmt_add_batch");
|
||||
|
||||
code = taos_stmt_execute(stmt);
|
||||
CHECK_CODE(code, "taos_stmt_execute");
|
||||
|
||||
taos_stmt_close(stmt);
|
||||
|
||||
free(t8_len);
|
||||
free(t16_len);
|
||||
free(t32_len);
|
||||
free(t64_len);
|
||||
free(float_len);
|
||||
free(double_len);
|
||||
free(bin_len);
|
||||
free(blob_len);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
TAOS* taos = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
taos = getNewConnection();
|
||||
taos_select_db(taos, TEST_DB);
|
||||
CHECK_CODE(code, "switch to database");
|
||||
|
||||
printf("************ prepare data *************\n");
|
||||
prepareData(taos);
|
||||
|
||||
for (int32_t i = 0; i < NUM_ROUNDS; ++i) {
|
||||
printf("************ verify schema-less *************\n");
|
||||
verifySchemaLess(taos);
|
||||
|
||||
printf("************ verify query *************\n");
|
||||
verifyQuery(taos);
|
||||
|
||||
printf("********* verify async query **********\n");
|
||||
verifyQueryAsync(taos);
|
||||
|
||||
printf("********* verify stmt query **********\n");
|
||||
veriryStmt(taos);
|
||||
|
||||
printf("done\n");
|
||||
}
|
||||
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue