Merge pull request #5762 from taosdata/feature/TD-3734
[TD-3734]use global uid as query id
This commit is contained in:
commit
ef2246da4c
|
@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting;
|
||||||
extern char tsEmail[];
|
extern char tsEmail[];
|
||||||
extern char tsArbitrator[];
|
extern char tsArbitrator[];
|
||||||
extern int8_t tsArbOnline;
|
extern int8_t tsArbOnline;
|
||||||
|
extern int32_t tsDnodeId;
|
||||||
|
|
||||||
// common
|
// common
|
||||||
extern int tsRpcTimer;
|
extern int tsRpcTimer;
|
||||||
|
|
|
@ -43,6 +43,7 @@ int8_t tsEnableVnodeBak = 1;
|
||||||
int8_t tsEnableTelemetryReporting = 1;
|
int8_t tsEnableTelemetryReporting = 1;
|
||||||
int8_t tsArbOnline = 0;
|
int8_t tsArbOnline = 0;
|
||||||
char tsEmail[TSDB_FQDN_LEN] = {0};
|
char tsEmail[TSDB_FQDN_LEN] = {0};
|
||||||
|
int32_t tsDnodeId = 0;
|
||||||
|
|
||||||
// common
|
// common
|
||||||
int32_t tsRpcTimer = 1000;
|
int32_t tsRpcTimer = 1000;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
#include "dnodeCfg.h"
|
#include "dnodeCfg.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
|
||||||
static SDnodeCfg tsCfg = {0};
|
static SDnodeCfg tsCfg = {0};
|
||||||
static pthread_mutex_t tsCfgMutex;
|
static pthread_mutex_t tsCfgMutex;
|
||||||
|
@ -70,6 +71,7 @@ static void dnodeResetCfg(SDnodeCfg *cfg) {
|
||||||
|
|
||||||
pthread_mutex_lock(&tsCfgMutex);
|
pthread_mutex_lock(&tsCfgMutex);
|
||||||
tsCfg.dnodeId = cfg->dnodeId;
|
tsCfg.dnodeId = cfg->dnodeId;
|
||||||
|
tsDnodeId = cfg->dnodeId;
|
||||||
tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
|
tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
|
||||||
dnodePrintCfg(cfg);
|
dnodePrintCfg(cfg);
|
||||||
dnodeWriteCfg();
|
dnodeWriteCfg();
|
||||||
|
|
|
@ -92,6 +92,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo);
|
||||||
void** qAcquireQInfo(void* pMgmt, uint64_t key);
|
void** qAcquireQInfo(void* pMgmt, uint64_t key);
|
||||||
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
|
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
|
||||||
bool checkQIdEqual(void *qHandle, uint64_t qId);
|
bool checkQIdEqual(void *qHandle, uint64_t qId);
|
||||||
|
int64_t genQueryId(void);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,6 +105,30 @@ int32_t getMaximumIdleDurationSec() {
|
||||||
return tsShellActivityTimer * 2;
|
return tsShellActivityTimer * 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int64_t genQueryId(void) {
|
||||||
|
int64_t uid = 0;
|
||||||
|
int64_t did = tsDnodeId;
|
||||||
|
|
||||||
|
uid = did << 54;
|
||||||
|
|
||||||
|
int64_t pid = ((int64_t)taosGetPId()) & 0x3FF;
|
||||||
|
|
||||||
|
uid |= pid << 44;
|
||||||
|
|
||||||
|
int64_t ts = taosGetTimestampMs() & 0x1FFFFFFFF;
|
||||||
|
|
||||||
|
uid |= ts << 11;
|
||||||
|
|
||||||
|
int64_t sid = atomic_add_fetch_64(&queryHandleId, 1) & 0x7FF;
|
||||||
|
|
||||||
|
uid |= sid;
|
||||||
|
|
||||||
|
return uid;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
||||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') {
|
if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') {
|
||||||
|
@ -6184,6 +6208,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
|
||||||
goto _cleanup_qinfo;
|
goto _cleanup_qinfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pQInfo->qId = *qId;
|
||||||
|
|
||||||
// to make sure third party won't overwrite this structure
|
// to make sure third party won't overwrite this structure
|
||||||
pQInfo->signature = pQInfo;
|
pQInfo->signature = pQInfo;
|
||||||
SQuery* pQuery = &pQInfo->query;
|
SQuery* pQuery = &pQInfo->query;
|
||||||
|
@ -6316,8 +6342,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
|
||||||
// todo refactor
|
// todo refactor
|
||||||
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
|
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
|
||||||
|
|
||||||
pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
|
|
||||||
*qId = pQInfo->qId;
|
|
||||||
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
|
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
|
||||||
return pQInfo;
|
return pQInfo;
|
||||||
|
|
||||||
|
|
|
@ -197,6 +197,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
|
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
|
||||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||||
assert(pQInfo && pQInfo->signature == pQInfo);
|
assert(pQInfo && pQInfo->signature == pQInfo);
|
||||||
|
|
|
@ -208,6 +208,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
|
||||||
pRsp->completed = true;
|
pRsp->completed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
||||||
void * pCont = pRead->pCont;
|
void * pCont = pRead->pCont;
|
||||||
int32_t contLen = pRead->contLen;
|
int32_t contLen = pRead->contLen;
|
||||||
|
@ -226,7 +227,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
||||||
|
|
||||||
if (contLen != 0) {
|
if (contLen != 0) {
|
||||||
qinfo_t pQInfo = NULL;
|
qinfo_t pQInfo = NULL;
|
||||||
uint64_t qId = 0;
|
uint64_t qId = genQueryId();
|
||||||
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
|
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
|
||||||
|
|
||||||
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
||||||
|
|
Loading…
Reference in New Issue