Merge pull request #1553 from taosdata/feature/query
[td-98] support iplist
This commit is contained in:
commit
d860ec1b86
|
@ -351,7 +351,7 @@ typedef struct SSqlObj {
|
|||
char * sqlstr;
|
||||
char retry;
|
||||
char maxRetry;
|
||||
SRpcIpSet *ipList;
|
||||
SRpcIpSet ipList;
|
||||
char freed : 4;
|
||||
char listed : 4;
|
||||
tsem_t rspSem;
|
||||
|
|
|
@ -209,7 +209,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
}
|
||||
|
||||
int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
|
||||
// assert(pQueryInfo->numOfTables == 0);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = NULL;
|
||||
if (pQueryInfo->numOfTables == 0) {
|
||||
|
|
|
@ -169,17 +169,27 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
|
|||
}
|
||||
|
||||
int tscSendMsgToServer(SSqlObj *pSql) {
|
||||
char *pMsg = rpcMallocCont(pSql->cmd.payloadLen);
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
char *pMsg = rpcMallocCont(pCmd->payloadLen);
|
||||
if (NULL == pMsg) {
|
||||
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
|
||||
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pSql->ipList->ip[0] = inet_addr(tsPrivateIp);
|
||||
|
||||
if (pSql->cmd.command < TSDB_SQL_MGMT) {
|
||||
pSql->ipList->port = tsDnodeShellPort;
|
||||
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
pSql->ipList.numOfIps = pTableMeta->numOfVpeers;
|
||||
pSql->ipList.port = tsDnodeShellPort;
|
||||
pSql->ipList.inUse = 0;
|
||||
|
||||
for(int32_t i = 0; i < pTableMeta->numOfVpeers; ++i) {
|
||||
pSql->ipList.ip[i] = pTableMeta->vpeerDesc[i].ip;
|
||||
}
|
||||
|
||||
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
|
||||
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -189,10 +199,12 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
|||
.handle = pSql,
|
||||
.code = 0
|
||||
};
|
||||
rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
|
||||
rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
|
||||
} else {
|
||||
pSql->ipList->port = tsMnodeShellPort;
|
||||
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
|
||||
pSql->ipList = tscMgmtIpList;
|
||||
pSql->ipList.port = tsMnodeShellPort;
|
||||
|
||||
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
|
||||
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = pSql->cmd.msgType,
|
||||
|
@ -201,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
|||
.handle = pSql,
|
||||
.code = 0
|
||||
};
|
||||
rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg);
|
||||
rpcSendRequest(pTscMgmtConn, &pSql->ipList, &rpcMsg);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -254,11 +266,15 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
|||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
} else {
|
||||
tscTrace("%p it shall renew meter meta, code:%d", pSql, tstrerror(rpcMsg->code));
|
||||
tscTrace("%p it shall renew table meta, code:%d", pSql, tstrerror(rpcMsg->code));
|
||||
|
||||
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
|
||||
pSql->res.code = rpcMsg->code; // keep the previous error code
|
||||
|
||||
if (++pSql->retry > pSql->maxRetry) {
|
||||
tscError("%p max retry %d reached, ", pSql, pSql->retry);
|
||||
return;
|
||||
}
|
||||
|
||||
rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
|
||||
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
|
@ -405,7 +421,7 @@ int tscProcessSql(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
// temp
|
||||
pSql->ipList = &tscMgmtIpList;
|
||||
// pSql->ipList = tscMgmtIpList;
|
||||
// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
|
||||
// pSql->index = pTableMetaInfo->pTableMeta->index;
|
||||
// } else { // it must be the parent SSqlObj for super table query
|
||||
|
@ -417,7 +433,7 @@ int tscProcessSql(SSqlObj *pSql) {
|
|||
// }
|
||||
// }
|
||||
} else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
|
||||
pSql->ipList = &tscMgmtIpList;
|
||||
pSql->ipList = tscMgmtIpList;
|
||||
} else { // local handler
|
||||
return (*tscProcessMsgRsp[pCmd->command])(pSql);
|
||||
}
|
||||
|
@ -532,9 +548,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
|
||||
|
||||
// pSql->cmd.payloadLen is set during copying data into paylaod
|
||||
// pSql->cmd.payloadLen is set during copying data into payload
|
||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||
tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htons(pMsgDesc->numOfVnodes));
|
||||
tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htonl(pMsgDesc->numOfVnodes));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1837,6 +1853,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
|
||||
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
|
||||
pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId);
|
||||
pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip);
|
||||
pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId);
|
||||
}
|
||||
|
||||
SSchema* pSchema = pMetaMsg->schema;
|
||||
|
@ -2436,7 +2454,7 @@ static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
|
|||
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
|
||||
int code = 0;
|
||||
|
||||
// handle metric meta renew process
|
||||
// handle table meta renew process
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
|
|
@ -860,12 +860,10 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
|
|||
pCmd->allocSize = size;
|
||||
}
|
||||
|
||||
memset(pCmd->payload, 0, pCmd->payloadLen);
|
||||
memset(pCmd->payload, 0, pCmd->allocSize);
|
||||
}
|
||||
|
||||
//memset(pCmd->payload, 0, pCmd->allocSize);
|
||||
assert(pCmd->allocSize >= size);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1508,9 +1508,9 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
|
|||
|
||||
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
|
||||
if (usePublicIp) {
|
||||
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
|
||||
pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].publicIp);
|
||||
} else {
|
||||
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp;
|
||||
pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].privateIp);
|
||||
}
|
||||
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
|
||||
pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId);
|
||||
|
|
|
@ -2480,12 +2480,11 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
|
|||
}
|
||||
|
||||
if (r == BLK_DATA_NO_NEEDED) {
|
||||
// qTrace("QInfo:%p vid:%d sid:%d id:%s, slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ",
|
||||
// rows:%d", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot,
|
||||
// pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints);
|
||||
qTrace("QInfo:%p slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d",
|
||||
GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->size);
|
||||
} else if (r == BLK_DATA_FILEDS_NEEDED) {
|
||||
if (tsdbRetrieveDataBlockStatisInfo(pRuntimeEnv->pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
|
||||
// return DISK_DATA_LOAD_FAILED;
|
||||
// return DISK_DATA_LOAD_FAILED;
|
||||
}
|
||||
|
||||
if (*pStatis == NULL) {
|
||||
|
@ -5909,14 +5908,6 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|||
tfree(pQuery->sdata[col]);
|
||||
}
|
||||
|
||||
// for (int col = 0; col < pQuery->numOfCols; ++col) {
|
||||
// vnodeFreeColumnInfo(&pQuery->colList[col].data);
|
||||
// }
|
||||
//
|
||||
// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
// tfree(pQuery->tsData);
|
||||
// }
|
||||
|
||||
sem_destroy(&(pQInfo->dataReady));
|
||||
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
|
||||
|
||||
|
@ -6126,9 +6117,6 @@ void qTableQuery(SQInfo *pQInfo) {
|
|||
|
||||
dTrace("QInfo:%p query task is launched", pQInfo);
|
||||
|
||||
// sem_post(&pQInfo->dataReady);
|
||||
// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER;
|
||||
|
||||
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
|
||||
if (numOfTables == 1) {
|
||||
singleTableQueryImpl(pQInfo);
|
||||
|
|
|
@ -1097,6 +1097,10 @@ static void rpcProcessConnError(void *param, void *id) {
|
|||
SRpcInfo *pRpc = pContext->pRpc;
|
||||
SRpcMsg rpcMsg;
|
||||
|
||||
if (pRpc == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
tTrace("%s connection error happens", pRpc->label);
|
||||
|
||||
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
|
||||
|
|
|
@ -37,10 +37,6 @@ typedef struct SField {
|
|||
// todo need the definition
|
||||
} SField;
|
||||
|
||||
typedef struct SHeaderFileInfo {
|
||||
int32_t fileId;
|
||||
} SHeaderFileInfo;
|
||||
|
||||
typedef struct SQueryFilePos {
|
||||
int32_t fid;
|
||||
int32_t slot;
|
||||
|
@ -380,11 +376,12 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
|
|||
|
||||
SArray *sa = getDefaultLoadColumns(pQueryHandle, true);
|
||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
|
||||
|
||||
// query ended in current block
|
||||
if (pQueryHandle->window.ekey < pBlock->keyLast) {
|
||||
doLoadDataFromFileBlock(pQueryHandle);
|
||||
filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa);
|
||||
} else { // the whole block is loaded in to buffer
|
||||
pQueryHandle->realNumOfRows = pBlock->numOfPoints;
|
||||
}
|
||||
} else {// todo desc query
|
||||
if (pQueryHandle->window.ekey > pBlock->keyFirst) {
|
||||
|
@ -932,10 +929,6 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
|
|||
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
|
||||
|
||||
if (pHandle->cur.fid < 0) {
|
||||
|
||||
|
||||
|
||||
|
||||
return pHandle->pColumns;
|
||||
} else {
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
||||
|
|
Loading…
Reference in New Issue