[TD-2165]<fix>: fix the bug that query can not be stopped if all query threads are busy.
This commit is contained in:
parent
34e374633a
commit
e51f8e5f51
|
@ -280,6 +280,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cmd = pCmd->command;
|
int32_t cmd = pCmd->command;
|
||||||
|
|
||||||
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
|
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
|
||||||
if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||||
pSql->cmd.submitSchema = 1;
|
pSql->cmd.submitSchema = 1;
|
||||||
|
@ -395,8 +396,7 @@ int doProcessSql(SSqlObj *pSql) {
|
||||||
pCmd->command == TSDB_SQL_CONNECT ||
|
pCmd->command == TSDB_SQL_CONNECT ||
|
||||||
pCmd->command == TSDB_SQL_HB ||
|
pCmd->command == TSDB_SQL_HB ||
|
||||||
pCmd->command == TSDB_SQL_META ||
|
pCmd->command == TSDB_SQL_META ||
|
||||||
pCmd->command == TSDB_SQL_STABLEVGROUP||
|
pCmd->command == TSDB_SQL_STABLEVGROUP) {
|
||||||
pCmd->command == TSDB_SQL_CANCEL_QUERY) {
|
|
||||||
pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
|
pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -451,9 +451,10 @@ int tscProcessSql(SSqlObj *pSql) {
|
||||||
|
|
||||||
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
|
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
|
||||||
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
|
|
||||||
|
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
||||||
|
pRetrieveMsg->free = htons(pQueryInfo->type);
|
||||||
|
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
|
||||||
|
|
||||||
// todo valid the vgroupId at the client side
|
// todo valid the vgroupId at the client side
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
@ -1393,42 +1394,42 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
|
// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
|
||||||
pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
|
// pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
|
||||||
|
//
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
//
|
||||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||||
int32_t vgIndex = pTableMetaInfo->vgroupIndex;
|
// int32_t vgIndex = pTableMetaInfo->vgroupIndex;
|
||||||
if (pTableMetaInfo->pVgroupTables == NULL) {
|
// if (pTableMetaInfo->pVgroupTables == NULL) {
|
||||||
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
|
// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
|
||||||
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
|
// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
|
||||||
|
//
|
||||||
pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
|
// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
|
||||||
tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
|
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
|
||||||
} else {
|
// } else {
|
||||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||||
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
|
// assert(vgIndex >= 0 && vgIndex < numOfVgroups);
|
||||||
|
//
|
||||||
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
|
// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
|
||||||
|
//
|
||||||
pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
|
// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
|
||||||
tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
|
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
|
||||||
}
|
// }
|
||||||
} else {
|
// } else {
|
||||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||||
pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
|
// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
|
||||||
tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
|
// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
|
// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
|
||||||
pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
|
// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
|
||||||
|
//
|
||||||
pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
|
// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
|
||||||
return TSDB_CODE_SUCCESS;
|
// return TSDB_CODE_SUCCESS;
|
||||||
}
|
//}
|
||||||
|
|
||||||
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
|
@ -2432,7 +2433,6 @@ void tscInitMsgsFp() {
|
||||||
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
|
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
|
||||||
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
|
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
|
||||||
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
|
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
|
||||||
tscBuildMsg[TSDB_SQL_CANCEL_QUERY] = tscBuildCancelQueryMsg;
|
|
||||||
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
|
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
|
||||||
|
|
||||||
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
|
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
|
||||||
|
|
|
@ -605,7 +605,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
|
||||||
cmd == TSDB_SQL_RETRIEVE ||
|
cmd == TSDB_SQL_RETRIEVE ||
|
||||||
cmd == TSDB_SQL_FETCH)) {
|
cmd == TSDB_SQL_FETCH)) {
|
||||||
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
|
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
|
||||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_CANCEL_QUERY;
|
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||||
tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]);
|
tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]);
|
||||||
|
|
||||||
tscProcessSql(pSql);
|
tscProcessSql(pSql);
|
||||||
|
|
|
@ -2149,6 +2149,29 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
|
||||||
|
if (pParentObj->retry > pParentObj->maxRetry) {
|
||||||
|
tscError("%p max retry reached, abort the retry effort", pParentObj)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfSub; ++i) {
|
||||||
|
int32_t code = pParentObj->pSubs[i]->res.code;
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID &&
|
||||||
|
code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
|
||||||
|
code != TSDB_CODE_APP_NOT_READY) {
|
||||||
|
pParentObj->res.code = code;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
|
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
|
||||||
SInsertSupporter *pSupporter = (SInsertSupporter *)param;
|
SInsertSupporter *pSupporter = (SInsertSupporter *)param;
|
||||||
SSqlObj* pParentObj = pSupporter->pSql;
|
SSqlObj* pParentObj = pSupporter->pSql;
|
||||||
|
@ -2190,8 +2213,12 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
||||||
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
|
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
|
||||||
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
|
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
|
||||||
} else {
|
} else {
|
||||||
int32_t numOfFailed = 0;
|
if (!needRetryInsert(pParentObj, numOfSub)) {
|
||||||
|
tscQueueAsyncRes(pParentObj);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfFailed = 0;
|
||||||
for(int32_t i = 0; i < numOfSub; ++i) {
|
for(int32_t i = 0; i < numOfSub; ++i) {
|
||||||
SSqlObj* pSql = pParentObj->pSubs[i];
|
SSqlObj* pSql = pParentObj->pSubs[i];
|
||||||
if (pSql->res.code != TSDB_CODE_SUCCESS) {
|
if (pSql->res.code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2221,7 +2248,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
||||||
|
|
||||||
tscResetSqlCmdObj(&pParentObj->cmd, false);
|
tscResetSqlCmdObj(&pParentObj->cmd, false);
|
||||||
|
|
||||||
tscDebug("%p re-parse sql to generate data", pParentObj);
|
tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry++);
|
||||||
int32_t code = tsParseSql(pParentObj, true);
|
int32_t code = tsParseSql(pParentObj, true);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,6 @@ enum {
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" )
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CANCEL_QUERY, "cancel-query" ) // send cancel msg to vnode to stop query
|
|
||||||
|
|
||||||
// the SQL below is for mgmt node
|
// the SQL below is for mgmt node
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
|
||||||
|
|
|
@ -35,7 +35,6 @@ int32_t dnodeInitShell() {
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CANCEL_QUERY] = dnodeDispatchToVReadQueue;
|
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
|
||||||
|
|
||||||
// the following message shall be treated as mnode write
|
// the following message shall be treated as mnode write
|
||||||
|
|
|
@ -29,14 +29,14 @@ int32_t dnodeInitVRead() {
|
||||||
tsVQueryWP.name = "vquery";
|
tsVQueryWP.name = "vquery";
|
||||||
tsVQueryWP.workerFp = dnodeProcessReadQueue;
|
tsVQueryWP.workerFp = dnodeProcessReadQueue;
|
||||||
tsVQueryWP.min = tsNumOfCores;
|
tsVQueryWP.min = tsNumOfCores;
|
||||||
tsVQueryWP.max = tsNumOfCores * tsNumOfThreadsPerCore;
|
tsVQueryWP.max = tsNumOfCores/* * tsNumOfThreadsPerCore*/;
|
||||||
if (tsVQueryWP.max <= tsVQueryWP.min * 2) tsVQueryWP.max = 2 * tsVQueryWP.min;
|
// if (tsVQueryWP.max <= tsVQueryWP.min * 2) tsVQueryWP.max = 2 * tsVQueryWP.min;
|
||||||
if (tWorkerInit(&tsVQueryWP) != 0) return -1;
|
if (tWorkerInit(&tsVQueryWP) != 0) return -1;
|
||||||
|
|
||||||
tsVFetchWP.name = "vfetch";
|
tsVFetchWP.name = "vfetch";
|
||||||
tsVFetchWP.workerFp = dnodeProcessReadQueue;
|
tsVFetchWP.workerFp = dnodeProcessReadQueue;
|
||||||
tsVFetchWP.min = 1;
|
tsVFetchWP.min = MIN(4, tsNumOfCores);
|
||||||
tsVFetchWP.max = 1;
|
tsVFetchWP.max = tsVFetchWP.min;
|
||||||
if (tWorkerInit(&tsVFetchWP) != 0) return -1;
|
if (tWorkerInit(&tsVFetchWP) != 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -45,7 +45,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CANCEL_QUERY, "cancel-query" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
|
||||||
|
|
||||||
|
@ -502,11 +502,6 @@ typedef struct {
|
||||||
uint16_t free;
|
uint16_t free;
|
||||||
} SRetrieveTableMsg;
|
} SRetrieveTableMsg;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMsgHead header;
|
|
||||||
uint64_t qhandle;
|
|
||||||
} SCancelQueryMsg;
|
|
||||||
|
|
||||||
typedef struct SRetrieveTableRsp {
|
typedef struct SRetrieveTableRsp {
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
int8_t completed; // all results are returned to client
|
int8_t completed; // all results are returned to client
|
||||||
|
|
|
@ -191,7 +191,7 @@ typedef struct SQueryRuntimeEnv {
|
||||||
void* pQueryHandle;
|
void* pQueryHandle;
|
||||||
void* pSecQueryHandle; // another thread for
|
void* pSecQueryHandle; // another thread for
|
||||||
bool stableQuery; // super table query or not
|
bool stableQuery; // super table query or not
|
||||||
bool topBotQuery; // false
|
bool topBotQuery; // TODO used bitwise flag
|
||||||
bool groupbyNormalCol; // denote if this is a groupby normal column query
|
bool groupbyNormalCol; // denote if this is a groupby normal column query
|
||||||
bool hasTagResults; // if there are tag values in final result or not
|
bool hasTagResults; // if there are tag values in final result or not
|
||||||
bool timeWindowInterpo;// if the time window start/end required interpolation
|
bool timeWindowInterpo;// if the time window start/end required interpolation
|
||||||
|
@ -223,7 +223,6 @@ typedef struct SQInfo {
|
||||||
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
|
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
|
||||||
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
||||||
SQueryRuntimeEnv runtimeEnv;
|
SQueryRuntimeEnv runtimeEnv;
|
||||||
// SArray* arrTableIdInfo;
|
|
||||||
SHashObj* arrTableIdInfo;
|
SHashObj* arrTableIdInfo;
|
||||||
int32_t groupIndex;
|
int32_t groupIndex;
|
||||||
|
|
||||||
|
@ -239,6 +238,7 @@ typedef struct SQInfo {
|
||||||
tsem_t ready;
|
tsem_t ready;
|
||||||
int32_t dataReady; // denote if query result is ready or not
|
int32_t dataReady; // denote if query result is ready or not
|
||||||
void* rspContext; // response context
|
void* rspContext; // response context
|
||||||
|
int64_t startExecTs; // start to exec timestamp
|
||||||
} SQInfo;
|
} SQInfo;
|
||||||
|
|
||||||
#endif // TDENGINE_QUERYEXECUTOR_H
|
#endif // TDENGINE_QUERYEXECUTOR_H
|
||||||
|
|
|
@ -128,11 +128,14 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
|
||||||
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
||||||
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
|
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
|
||||||
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
|
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
|
||||||
|
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
|
||||||
|
|
||||||
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
||||||
static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
|
static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
|
||||||
|
|
||||||
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
|
static int32_t getMaximumIdleDurationSec() {
|
||||||
|
return tsShellActivityTimer * 2;
|
||||||
|
}
|
||||||
|
|
||||||
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
||||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
@ -2138,8 +2141,31 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
|
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
|
||||||
|
return pQInfo->rspContext != NULL;
|
||||||
|
}
|
||||||
|
|
||||||
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
|
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
|
||||||
|
|
||||||
|
static bool isQueryKilled(SQInfo *pQInfo) {
|
||||||
|
if (IS_QUERY_KILLED(pQInfo)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
|
||||||
|
// abort current query execution.
|
||||||
|
if (pQInfo->owner != 0 && ((taosGetTimestampSec() - pQInfo->startExecTs) > getMaximumIdleDurationSec()) &&
|
||||||
|
(!needBuildResAfterQueryComplete(pQInfo))) {
|
||||||
|
|
||||||
|
assert(pQInfo->startExecTs != 0);
|
||||||
|
qDebug("QInfo:%p retrieve not arrive beyond %d sec, abort current query execution, start:%"PRId64", current:%d", pQInfo, 1,
|
||||||
|
pQInfo->startExecTs, taosGetTimestampSec());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;}
|
static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;}
|
||||||
|
|
||||||
static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
|
static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
|
@ -2864,7 +2890,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||||
summary->totalBlocks += 1;
|
summary->totalBlocks += 1;
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3432,7 +3458,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
||||||
int64_t startt = taosGetTimestampMs();
|
int64_t startt = taosGetTimestampMs();
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
qDebug("QInfo:%p it is already killed, abort", pQInfo);
|
qDebug("QInfo:%p it is already killed, abort", pQInfo);
|
||||||
|
|
||||||
tfree(pTableList);
|
tfree(pTableList);
|
||||||
|
@ -4018,7 +4044,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
||||||
cond.twindow.skey, cond.twindow.ekey);
|
cond.twindow.skey, cond.twindow.ekey);
|
||||||
|
|
||||||
// check if query is killed or not
|
// check if query is killed or not
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4675,7 +4701,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||||
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5112,7 +5138,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
||||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||||
summary->totalBlocks += 1;
|
summary->totalBlocks += 1;
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5491,7 +5517,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) {
|
while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) {
|
||||||
summary->totalBlocks += 1;
|
summary->totalBlocks += 1;
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5622,7 +5648,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
|
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
|
||||||
|
|
||||||
while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) {
|
while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) {
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5808,7 +5834,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
||||||
qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el);
|
qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el);
|
||||||
|
|
||||||
// query error occurred or query is killed, abort current execution
|
// query error occurred or query is killed, abort current execution
|
||||||
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
|
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
|
||||||
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
|
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
@ -5829,7 +5855,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
|
||||||
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
|
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
|
||||||
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
|
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
|
||||||
//TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead
|
//TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
|
@ -5945,7 +5971,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
||||||
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
||||||
doSecondaryArithmeticProcess(pQuery);
|
doSecondaryArithmeticProcess(pQuery);
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7521,7 +7547,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
|
||||||
pthread_mutex_lock(&pQInfo->lock);
|
pthread_mutex_lock(&pQInfo->lock);
|
||||||
|
|
||||||
pQInfo->dataReady = QUERY_RESULT_READY;
|
pQInfo->dataReady = QUERY_RESULT_READY;
|
||||||
buildRes = (pQInfo->rspContext != NULL);
|
buildRes = needBuildResAfterQueryComplete(pQInfo);
|
||||||
|
|
||||||
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
|
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
|
||||||
// put into task to be executed.
|
// put into task to be executed.
|
||||||
|
@ -7530,6 +7556,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
|
||||||
|
|
||||||
pthread_mutex_unlock(&pQInfo->lock);
|
pthread_mutex_unlock(&pQInfo->lock);
|
||||||
|
|
||||||
|
// used in retrieve blocking model.
|
||||||
tsem_post(&pQInfo->ready);
|
tsem_post(&pQInfo->ready);
|
||||||
return buildRes;
|
return buildRes;
|
||||||
}
|
}
|
||||||
|
@ -7546,7 +7573,9 @@ bool qTableQuery(qinfo_t qinfo) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
pQInfo->startExecTs = taosGetTimestampSec();
|
||||||
|
|
||||||
|
if (isQueryKilled(pQInfo)) {
|
||||||
qDebug("QInfo:%p it is already killed, abort", pQInfo);
|
qDebug("QInfo:%p it is already killed, abort", pQInfo);
|
||||||
return doBuildResCheck(pQInfo);
|
return doBuildResCheck(pQInfo);
|
||||||
}
|
}
|
||||||
|
@ -7578,7 +7607,7 @@ bool qTableQuery(qinfo_t qinfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
qDebug("QInfo:%p query is killed", pQInfo);
|
qDebug("QInfo:%p query is killed", pQInfo);
|
||||||
} else if (pQuery->rec.rows == 0) {
|
} else if (pQuery->rec.rows == 0) {
|
||||||
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
|
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
|
||||||
|
@ -7607,6 +7636,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (tsHalfCoresForQuery) {
|
if (tsHalfCoresForQuery) {
|
||||||
|
pQInfo->rspContext = pRspContext;
|
||||||
tsem_wait(&pQInfo->ready);
|
tsem_wait(&pQInfo->ready);
|
||||||
*buildRes = true;
|
*buildRes = true;
|
||||||
code = pQInfo->code;
|
code = pQInfo->code;
|
||||||
|
@ -7614,12 +7644,12 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
pthread_mutex_lock(&pQInfo->lock);
|
pthread_mutex_lock(&pQInfo->lock);
|
||||||
assert(pQInfo->rspContext == NULL);
|
|
||||||
|
|
||||||
|
assert(pQInfo->rspContext == NULL);
|
||||||
if (pQInfo->dataReady == QUERY_RESULT_READY) {
|
if (pQInfo->dataReady == QUERY_RESULT_READY) {
|
||||||
*buildRes = true;
|
*buildRes = true;
|
||||||
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%d", pQInfo, pQuery->rowSize,
|
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->rowSize,
|
||||||
pQuery->rec.rows, pQInfo->code);
|
pQuery->rec.rows, tstrerror(pQInfo->code));
|
||||||
} else {
|
} else {
|
||||||
*buildRes = false;
|
*buildRes = false;
|
||||||
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
|
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
|
||||||
|
@ -7697,7 +7727,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
return IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
|
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qKillQuery(qinfo_t qinfo) {
|
int32_t qKillQuery(qinfo_t qinfo) {
|
||||||
|
@ -7994,8 +8024,6 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2 * 1000;
|
|
||||||
|
|
||||||
SQueryMgmt *pQueryMgmt = pMgmt;
|
SQueryMgmt *pQueryMgmt = pMgmt;
|
||||||
if (pQueryMgmt->qinfoPool == NULL) {
|
if (pQueryMgmt->qinfoPool == NULL) {
|
||||||
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
|
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
|
||||||
|
@ -8011,7 +8039,8 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
|
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
|
||||||
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_QHANDLE_LIFE_SPAN);
|
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
|
||||||
|
(getMaximumIdleDurationSec()*1000));
|
||||||
// pthread_mutex_unlock(&pQueryMgmt->lock);
|
// pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||||
|
|
||||||
return handle;
|
return handle;
|
||||||
|
|
|
@ -24,15 +24,12 @@
|
||||||
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
|
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
|
||||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
|
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
|
||||||
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
|
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
|
||||||
static int32_t vnodeProcessCancelMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
|
|
||||||
|
|
||||||
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
|
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
|
||||||
|
|
||||||
int32_t vnodeInitRead(void) {
|
int32_t vnodeInitRead(void) {
|
||||||
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
|
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
|
||||||
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
|
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
|
||||||
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_CANCEL_QUERY] = vnodeProcessCancelMsg;
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,8 +117,7 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
|
||||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
|
atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
|
||||||
|
|
||||||
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_CANCEL_QUERY ||
|
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
|
||||||
pRead->msgType == TSDB_MSG_TYPE_FETCH) {
|
|
||||||
vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
|
vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
|
||||||
return taosWriteQitem(pVnode->fqueue, qtype, pRead);
|
return taosWriteQitem(pVnode->fqueue, qtype, pRead);
|
||||||
} else {
|
} else {
|
||||||
|
@ -202,20 +198,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
||||||
memset(pRet, 0, sizeof(SRspRet));
|
memset(pRet, 0, sizeof(SRspRet));
|
||||||
|
|
||||||
// qHandle needs to be freed correctly
|
// qHandle needs to be freed correctly
|
||||||
assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
|
vError("error rpc msg in query, %s", tstrerror(pRead->code));
|
||||||
|
}
|
||||||
|
// assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||||
// if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
// if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg *)pRead->pCont;
|
// SCancelQueryMsg *pMsg = (SCancelQueryMsg *)pRead->pCont;
|
||||||
//// pCancelMsg->free = htons(killQueryMsg->free);
|
//// pMsg->free = htons(killQueryMsg->free);
|
||||||
// pCancelMsg->qhandle = htobe64(pCancelMsg->qhandle);
|
// pMsg->qhandle = htobe64(pMsg->qhandle);
|
||||||
//
|
//
|
||||||
// vWarn("QInfo:%p connection %p broken, kill query", (void *)pCancelMsg->qhandle, pRead->rpcHandle);
|
// vWarn("QInfo:%p connection %p broken, kill query", (void *)pMsg->qhandle, pRead->rpcHandle);
|
||||||
//// assert(pRead->contLen > 0 && pCancelMsg->free == 1);
|
//// assert(pRead->contLen > 0 && pMsg->free == 1);
|
||||||
//
|
//
|
||||||
// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pCancelMsg->qhandle);
|
// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pMsg->qhandle);
|
||||||
// if (qhandle == NULL || *qhandle == NULL) {
|
// if (qhandle == NULL || *qhandle == NULL) {
|
||||||
// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pCancelMsg->qhandle, pRead->rpcHandle);
|
// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pMsg->qhandle, pRead->rpcHandle);
|
||||||
// } else {
|
// } else {
|
||||||
// assert(*qhandle == (void *)pCancelMsg->qhandle);
|
// assert(*qhandle == (void *)pMsg->qhandle);
|
||||||
//
|
//
|
||||||
// qKillQuery(*qhandle);
|
// qKillQuery(*qhandle);
|
||||||
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
|
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
|
||||||
|
@ -349,16 +348,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pRetrieve->free != 1);
|
// kill current query and free corresponding resources.
|
||||||
// if (pRetrieve->free == 1) {
|
if (pRetrieve->free == 1) {
|
||||||
// vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
||||||
// qKillQuery(*handle);
|
qKillQuery(*handle);
|
||||||
// qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
//
|
|
||||||
// vnodeBuildNoResultQueryRsp(pRet);
|
vnodeBuildNoResultQueryRsp(pRet);
|
||||||
// code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||||
// return code;
|
return code;
|
||||||
// }
|
}
|
||||||
|
|
||||||
// register the qhandle to connect to quit query immediate if connection is broken
|
// register the qhandle to connect to quit query immediate if connection is broken
|
||||||
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -406,47 +405,11 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
||||||
// notify connection(handle) that current qhandle is created, if current connection from
|
// notify connection(handle) that current qhandle is created, if current connection from
|
||||||
// client is broken, the query needs to be killed immediately.
|
// client is broken, the query needs to be killed immediately.
|
||||||
int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
|
int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
|
||||||
SCancelQueryMsg *pCancelMsg = rpcMallocCont(sizeof(SCancelQueryMsg));
|
SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
|
||||||
pCancelMsg->qhandle = htobe64((uint64_t)qhandle);
|
pMsg->qhandle = htobe64((uint64_t)qhandle);
|
||||||
pCancelMsg->header.vgId = htonl(vgId);
|
pMsg->header.vgId = htonl(vgId);
|
||||||
pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
|
pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
|
||||||
|
|
||||||
vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
|
vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
|
||||||
return rpcReportProgress(handle, (char *)pCancelMsg, sizeof(SCancelQueryMsg));
|
return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg));
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vnodeProcessCancelMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|
||||||
void *pCont = pRead->pCont;
|
|
||||||
SRspRet *pRet = &pRead->rspRet;
|
|
||||||
|
|
||||||
SCancelQueryMsg *pCancel = pCont;
|
|
||||||
pCancel->qhandle = htobe64(pCancel->qhandle);
|
|
||||||
|
|
||||||
vDebug("vgId:%d, QInfo:%p, cancel query msg is disposed, conn:%p", pVnode->vgId, (void *)pCancel->qhandle,
|
|
||||||
pRead->rpcHandle);
|
|
||||||
|
|
||||||
memset(pRet, 0, sizeof(SRspRet));
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
void ** handle = qAcquireQInfo(pVnode->qMgmt, pCancel->qhandle);
|
|
||||||
if (handle == NULL) {
|
|
||||||
code = terrno;
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
} else if ((*handle) != (void *)pCancel->qhandle) {
|
|
||||||
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
vError("vgId:%d, invalid handle in cancel query, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pCancel->qhandle);
|
|
||||||
vnodeBuildNoResultQueryRsp(pRet);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
vWarn("vgId:%d, QInfo:%p, cancel-query msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
|
||||||
qKillQuery(*handle);
|
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
|
||||||
|
|
||||||
vnodeBuildNoResultQueryRsp(pRet);
|
|
||||||
return TSDB_CODE_TSC_QUERY_CANCELLED;
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue