[td-98] add the group by support for super table query
This commit is contained in:
parent
09189682e6
commit
130d4cd20c
|
@ -200,8 +200,9 @@ int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQuer
|
|||
STableMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index);
|
||||
void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
|
||||
|
||||
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, SArray* vgroupList,
|
||||
int16_t numOfTags, int16_t* tags);
|
||||
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
|
||||
SVgroupsInfo* vgroupList, int16_t numOfTags, int16_t* tags);
|
||||
|
||||
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
|
||||
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
|
||||
void tscFreeSubqueryInfo(SSqlCmd* pCmd);
|
||||
|
|
|
@ -40,6 +40,8 @@ extern "C" {
|
|||
// forward declaration
|
||||
struct SSqlInfo;
|
||||
|
||||
typedef SCMSTableVgroupRspMsg SVgroupsInfo;
|
||||
|
||||
typedef struct SSqlGroupbyExpr {
|
||||
int16_t tableIndex;
|
||||
int16_t numOfGroupCols;
|
||||
|
@ -70,14 +72,12 @@ typedef struct STableMeta {
|
|||
|
||||
typedef struct STableMetaInfo {
|
||||
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
|
||||
// SSuperTableMeta *pMetricMeta; // metricmeta
|
||||
SArray* vgroupIdList;
|
||||
|
||||
SVgroupsInfo* vgroupList;
|
||||
/*
|
||||
* 1. keep the vnode index during the multi-vnode super table projection query
|
||||
* 2. keep the vnode index for multi-vnode insertion
|
||||
*/
|
||||
int32_t vnodeIndex;
|
||||
int32_t dnodeIndex;
|
||||
char name[TSDB_TABLE_ID_LEN]; // (super) table name
|
||||
int16_t numOfTags; // total required tags in query, including groupby tags
|
||||
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
|
||||
|
@ -210,7 +210,6 @@ typedef struct STableDataBlocks {
|
|||
} STableDataBlocks;
|
||||
|
||||
typedef struct SDataBlockList {
|
||||
int32_t idx;
|
||||
uint32_t nSize;
|
||||
uint32_t nAlloc;
|
||||
STableDataBlocks **pData;
|
||||
|
@ -257,7 +256,6 @@ typedef struct {
|
|||
|
||||
union {
|
||||
bool existsCheck; // check if the table exists or not
|
||||
bool inStream; // denote if current sql is executed in stream or not
|
||||
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
|
||||
int8_t dataSourceType; // load data from file or not
|
||||
};
|
||||
|
|
|
@ -443,12 +443,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
|
||||
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vnodeIndex >= 0 && pSql->param != NULL);
|
||||
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->dnodeIndex >= 0 && pSql->param != NULL);
|
||||
|
||||
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
|
||||
SSqlObj * pParObj = trs->pParentSqlObj;
|
||||
|
||||
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vnodeIndex &&
|
||||
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->dnodeIndex &&
|
||||
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
|
||||
|
||||
tscTrace("%p get metricMeta during super table query successfully", pSql);
|
||||
|
|
|
@ -408,7 +408,7 @@ static int insertStmtReset(STscStmt* pStmt) {
|
|||
pCmd->batchSize = 0;
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
pTableMetaInfo->vnodeIndex = 0;
|
||||
pTableMetaInfo->dnodeIndex = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -438,7 +438,7 @@ static int insertStmtExecute(STscStmt* stmt) {
|
|||
}
|
||||
|
||||
// set the next sent data vnode index in data block arraylist
|
||||
pTableMetaInfo->vnodeIndex = 1;
|
||||
pTableMetaInfo->dnodeIndex = 1;
|
||||
} else {
|
||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||
}
|
||||
|
|
|
@ -2470,7 +2470,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
|
|||
const char* msg8 = "not allowed column type for group by";
|
||||
const char* msg9 = "tags not allowed for table query";
|
||||
|
||||
// todo : handle two meter situation
|
||||
// todo : handle two tables situation
|
||||
STableMetaInfo* pTableMetaInfo = NULL;
|
||||
|
||||
if (pList == NULL) {
|
||||
|
@ -2493,7 +2493,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
|
|||
SSQLToken token = {pVar->nLen, pVar->nType, pVar->pz};
|
||||
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
|
||||
if (getColumnIndexByName(&token, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
|
||||
}
|
||||
|
@ -2523,13 +2522,13 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
|
|||
return invalidSqlErrMsg(pQueryInfo->msg, msg9);
|
||||
}
|
||||
|
||||
int32_t relIndex = index.columnIndex;
|
||||
if (index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) {
|
||||
relIndex -= tscGetNumOfColumns(pTableMeta);
|
||||
}
|
||||
// int32_t relIndex = index.columnIndex;
|
||||
// if (index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) {
|
||||
// relIndex -= tscGetNumOfColumns(pTableMeta);
|
||||
// }
|
||||
|
||||
pQueryInfo->groupbyExpr.columnInfo[i] =
|
||||
(SColIndex){.colIndex = relIndex, .flag = TSDB_COL_TAG, .colId = pSchema->colId}; // relIndex;
|
||||
(SColIndex){.colIndex = index.columnIndex, .flag = TSDB_COL_TAG, .colId = pSchema->colId}; // relIndex;
|
||||
addRequiredTagColumn(pQueryInfo, pQueryInfo->groupbyExpr.columnInfo[i].colIndex, index.tableIndex);
|
||||
} else {
|
||||
// check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by
|
||||
|
@ -5095,9 +5094,8 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) {
|
|||
bytes = TSDB_TABLE_NAME_LEN;
|
||||
name = TSQL_TBNAME_L;
|
||||
} else {
|
||||
colIndex = (TSDB_COL_IS_TAG(pColIndex->flag)) ? tscGetNumOfColumns(pTableMetaInfo->pTableMeta) + pColIndex->colIndex
|
||||
: pColIndex->colIndex;
|
||||
|
||||
// colIndex = (TSDB_COL_IS_TAG(pColIndex->flag)) ? tscGetNumOfColumns(pTableMetaInfo->pTableMeta) + pColIndex->colIndex
|
||||
// : pColIndex->colIndex;
|
||||
type = pSchema[colIndex].type;
|
||||
bytes = pSchema[colIndex].bytes;
|
||||
name = pSchema[colIndex].name;
|
||||
|
@ -5108,11 +5106,14 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) {
|
|||
|
||||
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, TSDB_FUNC_TAG, &index,
|
||||
type, bytes, bytes);
|
||||
|
||||
|
||||
memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName));
|
||||
strncpy(pExpr->aliasName, name, TSDB_COL_NAME_LEN);
|
||||
|
||||
pExpr->colInfo.flag = TSDB_COL_TAG;
|
||||
|
||||
// NOTE: tag column does not add to source column list
|
||||
SColumnList ids = {0};
|
||||
SColumnList ids = getColumnList(1, 0, pColIndex->colIndex);
|
||||
insertResultField(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs-1, &ids, bytes, type, name, pExpr);
|
||||
} else {
|
||||
// if this query is "group by" normal column, interval is not allowed
|
||||
|
@ -5693,7 +5694,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr);
|
||||
|
||||
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
|
||||
int32_t code = tscGetSTableVgroupInfo(pSql, index);
|
||||
code = tscGetSTableVgroupInfo(pSql, index);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -636,7 +636,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
|
|||
|
||||
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
|
||||
|
||||
size_t numOfSubs = taosArrayGetSize(pTableMetaInfo->vgroupIdList);
|
||||
size_t numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes;
|
||||
for (int32_t i = 0; i < numOfSubs; ++i) {
|
||||
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
|
||||
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
|
||||
|
|
|
@ -413,7 +413,7 @@ int tscProcessSql(SSqlObj *pSql) {
|
|||
|
||||
type = pQueryInfo->type;
|
||||
|
||||
// for hearbeat, numOfTables == 0;
|
||||
// for heartbeat, numOfTables == 0;
|
||||
assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
|
||||
}
|
||||
|
||||
|
@ -424,19 +424,6 @@ int tscProcessSql(SSqlObj *pSql) {
|
|||
pSql->res.code = TSDB_CODE_OTHERS;
|
||||
return pSql->res.code;
|
||||
}
|
||||
|
||||
// temp
|
||||
// pSql->ipList = tscMgmtIpList;
|
||||
// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
|
||||
// pSql->index = pTableMetaInfo->pTableMeta->index;
|
||||
// } else { // it must be the parent SSqlObj for super table query
|
||||
// if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
|
||||
// int32_t idx = pTableMetaInfo->vnodeIndex;
|
||||
//
|
||||
// SVnodeSidList *pSidList = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
|
||||
// pSql->index = pSidList->index;
|
||||
// }
|
||||
// }
|
||||
} else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
|
||||
pSql->ipList = tscMgmtIpList;
|
||||
} else { // local handler
|
||||
|
@ -522,8 +509,17 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pRetrieveMsg->free = htons(pQueryInfo->type);
|
||||
pMsg += sizeof(pQueryInfo->type);
|
||||
|
||||
STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta;
|
||||
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
|
||||
// todo valid the vgroupId at the client side
|
||||
if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) {
|
||||
SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList;
|
||||
assert(pVgroupInfo->dnodeVgroups->numOfVgroups == 1); // todo fix me
|
||||
|
||||
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->dnodeVgroups[0].vgId[0]);
|
||||
} else {
|
||||
STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta;
|
||||
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
|
||||
}
|
||||
|
||||
pMsg += sizeof(SRetrieveTableMsg);
|
||||
|
||||
pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
|
||||
|
@ -584,7 +580,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
|
|||
|
||||
#if 0
|
||||
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
|
||||
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
|
||||
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex);
|
||||
|
||||
int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids;
|
||||
int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
|
||||
|
@ -655,21 +651,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->head.vgId = htonl(pTableMeta->vgId);
|
||||
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
|
||||
} else { // query super table
|
||||
if (pTableMetaInfo->vnodeIndex < 0) {
|
||||
tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex);
|
||||
|
||||
if (pTableMetaInfo->dnodeIndex < 0) {
|
||||
tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->dnodeIndex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pSql->ipList.numOfIps = taosArrayGetSize(pTableMetaInfo->vgroupIdList);
|
||||
pSql->ipList.numOfIps = 1; // todo fix me
|
||||
pSql->ipList.port = tsDnodeShellPort;
|
||||
pSql->ipList.inUse = 0;
|
||||
|
||||
for(int32_t i = 0; i < pSql->ipList.numOfIps; ++i) {
|
||||
pSql->ipList.ip[i] = *(uint32_t*) taosArrayGet(pTableMetaInfo->vgroupIdList, i);
|
||||
}
|
||||
|
||||
// todo extract method
|
||||
STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[pTableMetaInfo->dnodeIndex];
|
||||
pSql->ipList.ip[0] = pVgroupInfo->ipAddr.ip;
|
||||
|
||||
#if 0
|
||||
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
|
||||
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex);
|
||||
uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;
|
||||
|
||||
numOfTables = pVnodeSidList->numOfSids;
|
||||
|
@ -679,9 +676,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
}
|
||||
#endif
|
||||
|
||||
uint32_t vnodeId = 1;
|
||||
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
|
||||
pQueryMsg->head.vgId = htonl(vnodeId);
|
||||
tscTrace("%p query on super table, numOfVgroup:%d, dnodeIndex:%d", pSql, pVgroupInfo->numOfVgroups,
|
||||
pTableMetaInfo->dnodeIndex);
|
||||
|
||||
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId[0]);
|
||||
numOfTables = 1;
|
||||
}
|
||||
|
||||
|
@ -859,7 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
int32_t numOfBlocks = 0;
|
||||
|
||||
if (pQueryInfo->tsBuf != NULL) {
|
||||
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vnodeIndex);
|
||||
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->dnodeIndex);
|
||||
assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent
|
||||
|
||||
// todo refactor
|
||||
|
@ -1851,7 +1849,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
|
||||
pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId);
|
||||
pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip);
|
||||
pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId);
|
||||
}
|
||||
|
@ -2116,21 +2113,30 @@ _error_clean:
|
|||
free(sizes);
|
||||
free(metricMetaList);
|
||||
#endif
|
||||
SSqlRes* pRes = &pSql->res;
|
||||
|
||||
SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pSql->res.pRsp;
|
||||
SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
|
||||
pStableVgroup->numOfDnodes = htonl(pStableVgroup->numOfDnodes);
|
||||
|
||||
SSqlObj* pparent = pSql->param;
|
||||
assert(pparent != NULL);
|
||||
// master sqlObj locates in param
|
||||
SSqlObj* parent = pSql->param;
|
||||
assert(parent != NULL);
|
||||
|
||||
SSqlCmd* pCmd = &pparent->cmd;
|
||||
SSqlCmd* pCmd = &parent->cmd;
|
||||
STableMetaInfo* pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
pInfo->vgroupIdList = taosArrayInit(pStableVgroup->numOfDnodes, sizeof(int32_t));
|
||||
|
||||
// todo opt performance
|
||||
for(int32_t i = 0; i < pStableVgroup->numOfDnodes; ++i) {
|
||||
int32_t ip = htonl(pStableVgroup->dnodeIps[i]);
|
||||
taosArrayPush(pInfo->vgroupIdList, &ip);
|
||||
pInfo->vgroupList = malloc(pRes->rspLen);
|
||||
memcpy(pInfo->vgroupList, pStableVgroup, pRes->rspLen);
|
||||
|
||||
for(int32_t i = 0; i < pInfo->vgroupList->numOfDnodes; ++i) {
|
||||
STableDnodeVgroupInfo* pVgroups = &pInfo->vgroupList->dnodeVgroups[i];
|
||||
pVgroups->numOfVgroups = htonl(pVgroups->numOfVgroups);
|
||||
pVgroups->ipAddr.ip = htonl(pVgroups->ipAddr.ip);
|
||||
pVgroups->ipAddr.port = htons(pVgroups->ipAddr.port);
|
||||
|
||||
for(int32_t j = 0; j < pVgroups->numOfVgroups; ++j) {
|
||||
pVgroups->vgId[j] = htonl(pVgroups->vgId[j]);
|
||||
}
|
||||
}
|
||||
|
||||
return pSql->res.code;
|
||||
|
@ -2492,7 +2498,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
|
|||
// bool required = false;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
||||
if (pQueryInfo->pTableMetaInfo[0]->vgroupIdList != NULL) {
|
||||
if (pQueryInfo->pTableMetaInfo[0]->vgroupList != NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -852,10 +852,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
|
|||
}
|
||||
} else {
|
||||
// if no free resource msg is sent to vnode, we free this object immediately.
|
||||
bool free = tscShouldFreeAsyncSqlObj(pSql);
|
||||
if (free) {
|
||||
assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL));
|
||||
|
||||
STscObj* pTscObj = pSql->pTscObj;
|
||||
|
||||
if (pTscObj->pSql != pSql) {
|
||||
tscFreeSqlObj(pSql);
|
||||
tscTrace("%p sql result is freed by app", pSql);
|
||||
} else {
|
||||
|
|
|
@ -517,7 +517,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pSql->cmd.inStream = 1; // 1 means sql in stream, allowed the sliding clause.
|
||||
pRes->code = tscToSQLCmd(pSql, &SQLInfo);
|
||||
SQLInfoDestroy(&SQLInfo);
|
||||
|
||||
|
|
|
@ -382,7 +382,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
|||
pSql->cmd.command = TSDB_SQL_SELECT;
|
||||
pQueryInfo->type = type;
|
||||
|
||||
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vnodeIndex = 0;
|
||||
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->dnodeIndex = 0;
|
||||
}
|
||||
|
||||
tscDoQuery(pSql);
|
||||
|
|
|
@ -341,8 +341,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
|
|||
|
||||
tscPrintSelectClause(pNew, 0);
|
||||
|
||||
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
|
||||
pSql, pNew, 0, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type,
|
||||
tscTrace("%p subquery:%p tableIndex:%d, dnodeIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
|
||||
pSql, pNew, 0, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type,
|
||||
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
|
||||
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
|
||||
}
|
||||
|
@ -457,7 +457,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vnodeIndex);
|
||||
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->dnodeIndex);
|
||||
tsBufDestory(pBuf);
|
||||
}
|
||||
|
||||
|
@ -478,9 +478,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
// for projection query, need to try next vnode
|
||||
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
|
||||
int32_t totalVnode = 0;
|
||||
if ((++pTableMetaInfo->vnodeIndex) < totalVnode) {
|
||||
if ((++pTableMetaInfo->dnodeIndex) < totalVnode) {
|
||||
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
|
||||
pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal);
|
||||
pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotal);
|
||||
|
||||
pSql->cmd.command = TSDB_SQL_SELECT;
|
||||
pSql->fp = tscJoinQueryCallback;
|
||||
|
@ -542,7 +542,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
assert(pQueryInfo->numOfTables == 1);
|
||||
|
||||
// for projection query, need to try next vnode if current vnode is exhausted
|
||||
// if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) {
|
||||
// if ((++pTableMetaInfo->dnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) {
|
||||
// pSupporter->pState->numOfCompleted = 0;
|
||||
// pSupporter->pState->numOfTotal = 1;
|
||||
//
|
||||
|
@ -609,7 +609,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
|
|||
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes &&
|
||||
// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes &&
|
||||
// (!tscHasReachLimitation(pQueryInfo, pRes))) {
|
||||
// numOfFetch++;
|
||||
// }
|
||||
|
@ -647,8 +647,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
|
|||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
if (pRes1->row >= pRes1->numOfRows) {
|
||||
tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1,
|
||||
pSupporter->subqueryIndex, pTableMetaInfo->vnodeIndex);
|
||||
tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, dnodeIndex:%d", pSql, pSql1,
|
||||
pSupporter->subqueryIndex, pTableMetaInfo->dnodeIndex);
|
||||
|
||||
tscResetForNextRetrieve(pRes1);
|
||||
pSql1->fp = joinRetrieveCallback;
|
||||
|
@ -785,11 +785,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
|
|||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
/**
|
||||
* if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of
|
||||
* if the query is a continue query (dnodeIndex > 0 for projection query) for next vnode, do the retrieval of
|
||||
* data instead of returning to its invoker
|
||||
*/
|
||||
if (pTableMetaInfo->vnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
// assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes);
|
||||
if (pTableMetaInfo->dnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
// assert(pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes);
|
||||
pSupporter->pState->numOfCompleted = 0; // reset the record value
|
||||
|
||||
pSql->fp = joinRetrieveCallback; // continue retrieve data
|
||||
|
@ -897,14 +897,14 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
|
|||
|
||||
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
|
||||
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type,
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type,
|
||||
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
|
||||
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
|
||||
tscPrintSelectClause(pNew, 0);
|
||||
|
||||
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
|
||||
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type,
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type,
|
||||
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
|
||||
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
|
||||
tscPrintSelectClause(pNew, 0);
|
||||
|
@ -1005,7 +1005,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
pSql->numOfSubs = taosArrayGetSize(pTableMetaInfo->vgroupIdList);
|
||||
pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes;
|
||||
assert(pSql->numOfSubs > 0);
|
||||
|
||||
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
|
||||
|
@ -1111,6 +1111,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
|
||||
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
|
||||
|
||||
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
|
||||
// set no disk space error info
|
||||
|
@ -1132,10 +1133,10 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
|
|||
|
||||
pthread_mutex_unlock(&trsupport->queryMutex);
|
||||
|
||||
tscRetrieveFromDnodeCallBack(trsupport, tres, trsupport->pState->code);
|
||||
tscHandleSubqueryError(trsupport, tres, trsupport->pState->code);
|
||||
}
|
||||
|
||||
static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
|
||||
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
|
||||
SSqlObj *pPObj = trsupport->pParentSqlObj;
|
||||
int32_t subqueryIndex = trsupport->subqueryIndex;
|
||||
|
||||
|
@ -1144,9 +1145,9 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
|
|||
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
|
||||
pPObj->numOfSubs == pState->numOfTotal);
|
||||
|
||||
/* retrieved in subquery failed. OR query cancelled in retrieve phase. */
|
||||
// retrieved in subquery failed. OR query cancelled in retrieve phase.
|
||||
if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
|
||||
pState->code = -(int)pPObj->res.code;
|
||||
pState->code = pPObj->res.code;
|
||||
|
||||
/*
|
||||
* kill current sub-query connection, which may retrieve data from vnodes;
|
||||
|
@ -1179,7 +1180,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
|
|||
|
||||
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
|
||||
if (pNew == NULL) {
|
||||
tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry",
|
||||
tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry",
|
||||
trsupport->pParentSqlObj, pSql);
|
||||
|
||||
pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
|
@ -1235,10 +1236,13 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
SSubqueryState* pState = trsupport->pState;
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
|
||||
|
||||
// data in from current vnode is stored in cache and disk
|
||||
// uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
|
||||
// tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
|
||||
// pSvd->vnode, numOfRowsFromSubquery, idx);
|
||||
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
|
||||
tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql,
|
||||
pTableMetaInfo->vgroupList->dnodeVgroups[0].ipAddr.ip, pTableMetaInfo->vgroupList->dnodeVgroups[0].vgId[0],
|
||||
numOfRowsFromSubquery, idx);
|
||||
|
||||
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
|
||||
|
||||
|
@ -1326,7 +1330,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|||
pthread_mutex_lock(&trsupport->queryMutex);
|
||||
|
||||
if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
|
||||
return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
|
||||
return tscHandleSubqueryError(trsupport, pSql, numOfRows);
|
||||
}
|
||||
|
||||
SSqlRes * pRes = &pSql->res;
|
||||
|
@ -1352,7 +1356,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
#ifdef _DEBUG_VIEW
|
||||
printf("received data from vnode: %d rows\n", pRes->numOfRows);
|
||||
SSrcColumnInfo colInfo[256] = {0};
|
||||
|
@ -1360,6 +1363,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|||
tscGetSrcColumnInfo(colInfo, pQueryInfo);
|
||||
tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
|
||||
#endif
|
||||
|
||||
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
|
||||
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
|
||||
tsAvailTmpDirGB, tsMinimalTmpDirGB);
|
||||
|
@ -1371,8 +1375,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|||
pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
|
||||
if (ret < 0) { // set no disk space error info, and abort retry
|
||||
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
|
||||
|
||||
} else if (pRes->completed) {
|
||||
tscAllDataRetrievedFromDnode(trsupport, pSql);
|
||||
return;
|
||||
|
||||
} else { // continue fetch data from dnode
|
||||
pthread_mutex_unlock(&trsupport->queryMutex);
|
||||
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
|
||||
|
@ -1380,6 +1387,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|||
} else { // all data has been retrieved to client
|
||||
tscAllDataRetrievedFromDnode(trsupport, pSql);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&trsupport->queryMutex);
|
||||
}
|
||||
|
||||
|
@ -1393,9 +1401,9 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
|
|||
|
||||
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
|
||||
|
||||
// launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
|
||||
// launch subquery for each vnode, so the subquery index equals to the dnodeIndex.
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
|
||||
pTableMetaInfo->vnodeIndex = trsupport->subqueryIndex;
|
||||
pTableMetaInfo->dnodeIndex = trsupport->subqueryIndex;
|
||||
|
||||
pSql->pSubs[trsupport->subqueryIndex] = pNew;
|
||||
}
|
||||
|
@ -1404,37 +1412,34 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
|
|||
}
|
||||
|
||||
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
||||
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
|
||||
SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
|
||||
|
||||
SSqlObj* pParentSql = trsupport->pParentSqlObj;
|
||||
SSqlObj* pSql = (SSqlObj *)tres;
|
||||
SSqlObj* pSql = (SSqlObj *) tres;
|
||||
|
||||
// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||
assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1);
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
|
||||
|
||||
// int32_t idx = pTableMetaInfo->vnodeIndex;
|
||||
|
||||
SVnodeSidList *vnodeInfo = NULL;
|
||||
SVnodeDesc * pSvd = NULL;
|
||||
// if (pTableMetaInfo->pMetricMeta != NULL) {
|
||||
// vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
|
||||
// pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
|
||||
// }
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||
STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[0];
|
||||
|
||||
SSubqueryState* pState = trsupport->pState;
|
||||
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
|
||||
pParentSql->numOfSubs == pState->numOfTotal);
|
||||
|
||||
if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
|
||||
// metric query is killed, Note: code must be less than 0
|
||||
|
||||
// stable query is killed, abort further retry
|
||||
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
|
||||
|
||||
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
|
||||
code = -(int)(pParentSql->res.code);
|
||||
code = pParentSql->res.code;
|
||||
} else {
|
||||
code = pState->code;
|
||||
}
|
||||
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql,
|
||||
trsupport->subqueryIndex, code);
|
||||
|
||||
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql,
|
||||
trsupport->subqueryIndex, tstrerror(code));
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1442,51 +1447,40 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
|||
* than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
|
||||
* function to abort current and remain retrieve process.
|
||||
*
|
||||
* NOTE: threadsafe is required.
|
||||
* NOTE: thread safe is required.
|
||||
*/
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
|
||||
tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code);
|
||||
tscTrace("%p sub:%p reach the max retry times, set global code:%d", pParentSql, pSql, code);
|
||||
atomic_val_compare_exchange_32(&pState->code, 0, code);
|
||||
} else { // does not reach the maximum retry count, go on
|
||||
} else { // does not reach the maximum retry time, go on
|
||||
tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
|
||||
|
||||
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
|
||||
if (pNew == NULL) {
|
||||
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
|
||||
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vgId : -1, trsupport->subqueryIndex);
|
||||
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d",
|
||||
trsupport->pParentSqlObj, pSql, pVgroupInfo->vgId[0], trsupport->subqueryIndex);
|
||||
|
||||
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
|
||||
} else {
|
||||
// SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
||||
// assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL);
|
||||
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
||||
assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL);
|
||||
|
||||
tscProcessSql(pNew);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
|
||||
if (vnodeInfo != NULL) {
|
||||
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
|
||||
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
|
||||
trsupport->subqueryIndex, pState->code);
|
||||
} else {
|
||||
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
|
||||
trsupport->subqueryIndex, pState->code);
|
||||
}
|
||||
|
||||
tscRetrieveFromDnodeCallBack(param, tres, pState->code);
|
||||
if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
|
||||
tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
|
||||
pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex, pState->code);
|
||||
|
||||
tscHandleSubqueryError(param, tres, pState->code);
|
||||
} else { // success, proceed to retrieve data from dnode
|
||||
if (vnodeInfo != NULL) {
|
||||
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
||||
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
|
||||
trsupport->subqueryIndex);
|
||||
} else {
|
||||
tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
||||
trsupport->subqueryIndex);
|
||||
}
|
||||
tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
||||
pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex);
|
||||
|
||||
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
|
||||
}
|
||||
|
|
|
@ -1178,8 +1178,9 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
|
|||
pExprInfo->pExprs[index] = pExpr;
|
||||
|
||||
pExpr->functionId = functionId;
|
||||
|
||||
int16_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
|
||||
|
||||
// set the correct column index
|
||||
if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
|
||||
|
@ -1190,7 +1191,6 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
|
|||
|
||||
// tag columns require the column index revised.
|
||||
if (pColIndex->columnIndex >= numOfCols) {
|
||||
pColIndex->columnIndex -= numOfCols;
|
||||
pExpr->colInfo.flag = TSDB_COL_TAG;
|
||||
} else {
|
||||
if (pColIndex->columnIndex != TSDB_TBNAME_COLUMN_INDEX) {
|
||||
|
@ -1916,7 +1916,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) {
|
|||
}
|
||||
|
||||
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
|
||||
SArray* vgroupList, int16_t numOfTags, int16_t* tags) {
|
||||
SVgroupsInfo* vgroupList, int16_t numOfTags, int16_t* tags) {
|
||||
void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
|
||||
if (pAlloc == NULL) {
|
||||
return NULL;
|
||||
|
@ -1937,7 +1937,12 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
|
|||
pTableMetaInfo->numOfTags = numOfTags;
|
||||
|
||||
if (vgroupList != NULL) {
|
||||
pTableMetaInfo->vgroupIdList = taosArrayClone(vgroupList);
|
||||
assert(vgroupList->numOfDnodes == 1); // todo fix me
|
||||
size_t size = sizeof(SVgroupsInfo) + (sizeof(STableDnodeVgroupInfo) +
|
||||
vgroupList->dnodeVgroups[0].numOfVgroups * sizeof(int32_t)) * vgroupList->numOfDnodes;
|
||||
|
||||
pTableMetaInfo->vgroupList = malloc(size);
|
||||
memcpy(pTableMetaInfo->vgroupList, vgroupList, size);
|
||||
}
|
||||
|
||||
if (tags != NULL) {
|
||||
|
@ -1952,7 +1957,7 @@ STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) {
|
|||
return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, 0, NULL);
|
||||
}
|
||||
|
||||
void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) {
|
||||
void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) {
|
||||
if (index < 0 || index >= pQueryInfo->numOfTables) {
|
||||
return;
|
||||
}
|
||||
|
@ -1975,7 +1980,7 @@ void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool
|
|||
|
||||
int32_t index = pQueryInfo->numOfTables;
|
||||
while (index >= 0) {
|
||||
doRemoveMeterMetaInfo(pQueryInfo, --index, removeFromCache);
|
||||
doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache);
|
||||
}
|
||||
|
||||
tfree(pQueryInfo->pTableMetaInfo);
|
||||
|
@ -1987,6 +1992,7 @@ void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
|
|||
}
|
||||
|
||||
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
|
||||
tfree(pTableMetaInfo->vgroupList);
|
||||
// taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pMetricMeta), removeFromCache);
|
||||
}
|
||||
|
||||
|
@ -2014,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
|
||||
pNew->sqlstr = strdup(pSql->sqlstr);
|
||||
if (pNew->sqlstr == NULL) {
|
||||
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex);
|
||||
tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex);
|
||||
|
||||
free(pNew);
|
||||
return NULL;
|
||||
|
@ -2058,7 +2064,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
}
|
||||
|
||||
if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
|
||||
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex);
|
||||
tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex);
|
||||
tscFreeSqlObj(pNew);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2128,7 +2134,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
// pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
|
||||
// }
|
||||
|
||||
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupIdList, pTableMetaInfo->numOfTags,
|
||||
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->numOfTags,
|
||||
pTableMetaInfo->tagColumnIndex);
|
||||
} else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object.
|
||||
// STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
|
||||
|
@ -2149,13 +2155,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
tscTrace(
|
||||
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
|
||||
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
|
||||
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime,
|
||||
pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
|
||||
|
||||
tscPrintSelectClause(pNew, 0);
|
||||
} else {
|
||||
tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vnodeIndex);
|
||||
tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->dnodeIndex);
|
||||
}
|
||||
|
||||
return pNew;
|
||||
|
@ -2252,7 +2258,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
|
|||
|
||||
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
|
||||
// return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
|
||||
// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vnodeIndex < totalVnode - 1);
|
||||
// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->dnodeIndex < totalVnode - 1);
|
||||
}
|
||||
|
||||
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
|
||||
|
@ -2271,9 +2277,9 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
|
|||
int32_t totalVnode = 0;
|
||||
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
|
||||
|
||||
while (++pTableMetaInfo->vnodeIndex < totalVnode) {
|
||||
while (++pTableMetaInfo->dnodeIndex < totalVnode) {
|
||||
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
|
||||
pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause);
|
||||
pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause);
|
||||
|
||||
/*
|
||||
* update the limit and offset value for the query on the next vnode,
|
||||
|
@ -2292,7 +2298,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
|
|||
|
||||
assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
|
||||
tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql,
|
||||
pTableMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
|
||||
pTableMetaInfo->dnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
|
||||
|
||||
/*
|
||||
* For project query with super table join, the numOfSub is equalled to the number of all subqueries.
|
||||
|
|
|
@ -188,6 +188,11 @@ extern char *taosMsg[];
|
|||
|
||||
#pragma pack(push, 1)
|
||||
|
||||
typedef struct {
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
} SIpAddr;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfVnodes;
|
||||
} SMsgDesc;
|
||||
|
@ -469,7 +474,6 @@ typedef struct {
|
|||
int16_t numOfGroupCols; // num of group by columns
|
||||
int16_t orderByIdx;
|
||||
int16_t orderType; // used in group by xx order by xxx
|
||||
uint64_t groupbyTagIds;
|
||||
int64_t limit;
|
||||
int64_t offset;
|
||||
uint16_t queryType; // denote another query process
|
||||
|
@ -616,9 +620,15 @@ typedef struct SCMSTableVgroupMsg {
|
|||
char tableId[TSDB_TABLE_ID_LEN];
|
||||
} SCMSTableVgroupMsg;
|
||||
|
||||
typedef struct {
|
||||
SIpAddr ipAddr;
|
||||
int32_t numOfVgroups;
|
||||
int32_t vgId[];
|
||||
} STableDnodeVgroupInfo;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfDnodes;
|
||||
uint32_t dnodeIps[];
|
||||
STableDnodeVgroupInfo dnodeVgroups[];
|
||||
} SCMSTableVgroupRspMsg;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1103,12 +1103,26 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
|
|||
if (pRsp == NULL) {
|
||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
||||
return;
|
||||
}
|
||||
|
||||
pRsp->numOfDnodes = htonl(1);
|
||||
pRsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp));
|
||||
}
|
||||
|
||||
int32_t numOfVgroups = 1;
|
||||
int32_t numOfDnodes = 1;
|
||||
|
||||
pRsp->numOfDnodes = htonl(numOfDnodes);
|
||||
STableDnodeVgroupInfo* pVgroupInfo = pRsp->dnodeVgroups;
|
||||
pVgroupInfo->ipAddr.ip = htonl(inet_addr(tsPrivateIp));
|
||||
|
||||
pVgroupInfo->ipAddr.port = htons(0); // todo fix it
|
||||
pVgroupInfo->numOfVgroups = htonl(numOfVgroups); // todo fix it
|
||||
int32_t* vgIdList = pVgroupInfo->vgId;
|
||||
|
||||
for(int32_t i = 0; i < numOfVgroups; ++i) {
|
||||
vgIdList[i] = htonl(2); // todo fix it
|
||||
}
|
||||
|
||||
assert(numOfDnodes == 1); // this size is valid only when numOfDnodes equals 1
|
||||
int32_t msgLen = sizeof(SCMSTableVgroupRspMsg) + sizeof(STableDnodeVgroupInfo) + numOfVgroups * sizeof(int32_t);
|
||||
|
||||
int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t);
|
||||
SRpcMsg rpcRsp = {0};
|
||||
rpcRsp.handle = pMsg->thandle;
|
||||
rpcRsp.pCont = pRsp;
|
||||
|
@ -1510,7 +1524,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
|
|||
} else {
|
||||
pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].privateIp);
|
||||
}
|
||||
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
|
||||
// pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
|
||||
pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId);
|
||||
}
|
||||
pMeta->numOfVpeers = pVgroup->numOfVnodes;
|
||||
|
|
Loading…
Reference in New Issue