Merge pull request #5426 from taosdata/hotfix/TD-3030
[TD-3030]two queries use one query info issue
This commit is contained in:
commit
cac833343c
|
@ -520,7 +520,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
|
||||
|
||||
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qhandle);
|
||||
} else {
|
||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
|
||||
|
@ -528,12 +528,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
|
||||
|
||||
pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qhandle);
|
||||
}
|
||||
} else {
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
|
||||
tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgId);
|
||||
tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qhandle:%" PRIX64, pSql, pTableMeta->vgId, pSql->res.qhandle);
|
||||
}
|
||||
|
||||
pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
|
||||
|
|
|
@ -28,7 +28,7 @@ typedef void* qinfo_t;
|
|||
* @param qinfo
|
||||
* @return
|
||||
*/
|
||||
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
|
||||
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t *qId);
|
||||
|
||||
|
||||
/**
|
||||
|
@ -88,9 +88,10 @@ void* qOpenQueryMgmt(int32_t vgId);
|
|||
void qQueryMgmtNotifyClosed(void* pExecutor);
|
||||
void qQueryMgmtReOpen(void *pExecutor);
|
||||
void qCleanupQueryMgmt(void* pExecutor);
|
||||
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
|
||||
void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo);
|
||||
void** qAcquireQInfo(void* pMgmt, uint64_t key);
|
||||
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
|
||||
bool checkQIdEqual(void *qHandle, uint64_t qId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -308,6 +308,7 @@ enum {
|
|||
|
||||
typedef struct SQInfo {
|
||||
void* signature;
|
||||
uint64_t qId;
|
||||
int32_t code; // error code to returned to client
|
||||
int64_t owner; // if it is in execution
|
||||
|
||||
|
@ -429,7 +430,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
|
|||
|
||||
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
|
||||
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
|
||||
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql);
|
||||
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId);
|
||||
int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable);
|
||||
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
|
||||
|
||||
|
|
|
@ -98,6 +98,9 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
|
|||
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
|
||||
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
|
||||
|
||||
|
||||
uint64_t queryHandleId = 0;
|
||||
|
||||
int32_t getMaximumIdleDurationSec() {
|
||||
return tsShellActivityTimer * 2;
|
||||
}
|
||||
|
@ -6111,9 +6114,13 @@ void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) {
|
|||
pResultInfo->total = 0;
|
||||
}
|
||||
|
||||
FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
|
||||
return ((SQInfo *)qHandle)->qId == qId;
|
||||
}
|
||||
|
||||
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
|
||||
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery,
|
||||
char* sql) {
|
||||
char* sql, uint64_t *qId) {
|
||||
int16_t numOfCols = pQueryMsg->numOfCols;
|
||||
int16_t numOfOutput = pQueryMsg->numOfOutput;
|
||||
|
||||
|
@ -6254,7 +6261,9 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
|
|||
// todo refactor
|
||||
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
|
||||
|
||||
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
|
||||
pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
|
||||
*qId = pQInfo->qId;
|
||||
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
|
||||
return pQInfo;
|
||||
|
||||
_cleanup_qinfo:
|
||||
|
|
|
@ -68,7 +68,7 @@ void freeParam(SQueryParam *param) {
|
|||
tfree(param->prevResult);
|
||||
}
|
||||
|
||||
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo) {
|
||||
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t *qId) {
|
||||
assert(pQueryMsg != NULL && tsdb != NULL);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -158,7 +158,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
goto _over;
|
||||
}
|
||||
|
||||
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql);
|
||||
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId);
|
||||
|
||||
param.sql = NULL;
|
||||
param.pExprs = NULL;
|
||||
|
@ -472,7 +472,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
|
|||
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
|
||||
}
|
||||
|
||||
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||
void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) {
|
||||
if (pMgmt == NULL) {
|
||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
return NULL;
|
||||
|
@ -492,8 +492,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
|||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
return NULL;
|
||||
} else {
|
||||
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
|
||||
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
|
||||
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
|
||||
(getMaximumIdleDurationSec()*1000));
|
||||
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||
|
||||
|
|
|
@ -247,7 +247,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
|
||||
if (contLen != 0) {
|
||||
qinfo_t pQInfo = NULL;
|
||||
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
|
||||
uint64_t qId = 0;
|
||||
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
|
||||
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
pRsp->code = code;
|
||||
|
@ -259,22 +260,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
|
||||
// current connect is broken
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
|
||||
handle = qRegisterQInfo(pVnode->qMgmt, qId, (uint64_t)pQInfo);
|
||||
if (handle == NULL) { // failed to register qhandle
|
||||
pRsp->code = terrno;
|
||||
terrno = 0;
|
||||
vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
||||
vError("vgId:%d, QInfo:%"PRIu64 "-%p register qhandle failed, return to app, code:%s", pVnode->vgId, qId, (void *)pQInfo,
|
||||
tstrerror(pRsp->code));
|
||||
qDestroyQueryInfo(pQInfo); // destroy it directly
|
||||
return pRsp->code;
|
||||
} else {
|
||||
assert(*handle == pQInfo);
|
||||
pRsp->qhandle = htobe64((uint64_t)pQInfo);
|
||||
pRsp->qhandle = htobe64(qId);
|
||||
}
|
||||
|
||||
if (handle != NULL &&
|
||||
vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
|
||||
vError("vgId:%d, QInfo:%"PRIu64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle,
|
||||
pRead->rpcHandle);
|
||||
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
|
@ -285,7 +286,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
}
|
||||
|
||||
if (handle != NULL) {
|
||||
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
|
||||
vTrace("vgId:%d, QInfo:%"PRIu64 "-%p, dnode query msg disposed, create qhandle and returns to app", vgId, qId, *handle);
|
||||
code = vnodePutItemIntoReadQueue(pVnode, handle, pRead->rpcHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pRsp->code = code;
|
||||
|
@ -349,7 +350,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
pRetrieve->free = htons(pRetrieve->free);
|
||||
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
||||
|
||||
vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
|
||||
vTrace("vgId:%d, QInfo:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qhandle,
|
||||
pRetrieve->free, pRead->rpcHandle);
|
||||
|
||||
memset(pRet, 0, sizeof(SRspRet));
|
||||
|
@ -360,19 +361,19 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
if (handle == NULL) {
|
||||
code = terrno;
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
} else if ((*handle) != (void *)pRetrieve->qhandle) {
|
||||
} else if (!checkQIdEqual(*handle, pRetrieve->qhandle)) {
|
||||
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle);
|
||||
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qhandle);
|
||||
vnodeBuildNoResultQueryRsp(pRet);
|
||||
return code;
|
||||
}
|
||||
|
||||
// kill current query and free corresponding resources.
|
||||
if (pRetrieve->free == 1) {
|
||||
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
||||
vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qhandle, *handle);
|
||||
qKillQuery(*handle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
|
||||
|
@ -383,7 +384,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
|
||||
// register the qhandle to connect to quit query immediate if connection is broken
|
||||
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pRead->rpcHandle);
|
||||
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
|
||||
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
qKillQuery(*handle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
|
@ -441,4 +442,4 @@ void vnodeWaitReadCompleted(SVnodeObj *pVnode) {
|
|||
vTrace("vgId:%d, queued rmsg num:%d", pVnode->vgId, pVnode->queuedRMsg);
|
||||
taosMsleep(10);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue