commit
6d8641f387
|
@ -1238,22 +1238,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
|||
goto _clean;
|
||||
}
|
||||
|
||||
// submit to more than one vnode
|
||||
if (pCmd->pDataBlocks->nSize > 0) {
|
||||
// merge according to vgId
|
||||
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
|
||||
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||
|
||||
// set the next sent data vnode index in data block arraylist
|
||||
pTableMetaInfo->vnodeIndex = 1;
|
||||
} else {
|
||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||
}
|
||||
|
|
|
@ -1489,11 +1489,9 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
|
|||
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes);
|
||||
strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName));
|
||||
|
||||
// for point interpolation/last_row query, we need the timestamp column to be loaded
|
||||
// for all querie, the timestamp column meeds to be loaded
|
||||
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) {
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &index);
|
||||
}
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &index);
|
||||
|
||||
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
|
||||
insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr);
|
||||
|
@ -1581,7 +1579,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
|||
tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i]));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
case TK_SUM:
|
||||
|
@ -1689,7 +1690,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
|||
tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i]));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
case TK_FIRST:
|
||||
|
@ -1708,7 +1712,6 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
|||
}
|
||||
|
||||
/* in first/last function, multiple columns can be add to resultset */
|
||||
|
||||
for (int32_t i = 0; i < pItem->pNode->pParam->nExpr; ++i) {
|
||||
tSQLExprItem* pParamElem = &(pItem->pNode->pParam->a[i]);
|
||||
if (pParamElem->pNode->nSQLOptr != TK_ALL && pParamElem->pNode->nSQLOptr != TK_ID) {
|
||||
|
@ -1753,7 +1756,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else { // select * from xxx
|
||||
int32_t numOfFields = 0;
|
||||
|
@ -1773,6 +1776,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
|||
numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
}
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
@ -1891,6 +1895,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
|||
default:
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
// todo refactor
|
||||
|
|
|
@ -341,14 +341,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
|||
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
|
||||
|
||||
if (shouldFree) {
|
||||
// If it is failed, all objects allocated during execution taos_connect_a should be released
|
||||
if (command == TSDB_SQL_CONNECT) {
|
||||
taos_close(pObj);
|
||||
tscTrace("%p Async sql close failed connection", pSql);
|
||||
} else {
|
||||
tscFreeSqlObj(pSql);
|
||||
tscTrace("%p Async sql is automatically freed", pSql);
|
||||
}
|
||||
tscFreeSqlObj(pSql);
|
||||
tscTrace("%p Async sql is automatically freed", pSql);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -594,11 +594,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
if (numOfTableHasRes >= 2) { // do merge result
|
||||
|
||||
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL);
|
||||
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
|
||||
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
|
||||
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
|
||||
} else { // only one subquery
|
||||
SSqlObj *pSub = pSql->pSubs[0];
|
||||
if (pSub == NULL) {
|
||||
|
@ -674,14 +670,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
if (pRes->qhandle == 0 ||
|
||||
pRes->completed ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||
pCmd->command == TSDB_SQL_INSERT) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// current data are exhausted, fetch more data
|
||||
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows &&
|
||||
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
|
||||
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
|
||||
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
|
||||
|
||||
|
|
|
@ -504,7 +504,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
|||
}
|
||||
|
||||
tsem_init(&pSql->rspSem, 0, 0);
|
||||
tsem_init(&pSql->emptyRspSem, 0, 1);
|
||||
|
||||
SSqlInfo SQLInfo = {0};
|
||||
tSQLParse(&SQLInfo, pSql->sqlstr);
|
||||
|
|
|
@ -423,9 +423,6 @@ void tscFreeResData(SSqlObj* pSql) {
|
|||
}
|
||||
|
||||
void tscFreeSqlResult(SSqlObj* pSql) {
|
||||
//TODO not free
|
||||
return;
|
||||
|
||||
tfree(pSql->res.pRsp);
|
||||
pSql->res.row = 0;
|
||||
pSql->res.numOfRows = 0;
|
||||
|
@ -469,8 +466,6 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
|
|||
tscFreeSqlCmdData(pCmd);
|
||||
|
||||
tscTrace("%p free sqlObj partial completed", pSql);
|
||||
|
||||
tscFreeSqlCmdData(pCmd);
|
||||
}
|
||||
|
||||
void tscFreeSqlObj(SSqlObj* pSql) {
|
||||
|
@ -489,10 +484,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
|
|||
|
||||
pCmd->allocSize = 0;
|
||||
|
||||
if (pSql->fp == NULL) {
|
||||
tsem_destroy(&pSql->rspSem);
|
||||
tsem_destroy(&pSql->emptyRspSem);
|
||||
}
|
||||
tsem_destroy(&pSql->rspSem);
|
||||
free(pSql);
|
||||
}
|
||||
|
||||
|
@ -721,7 +713,7 @@ static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
|
|||
|
||||
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
|
||||
|
||||
SSubmitBlk* pBlock = pTableDataBlock->pData;
|
||||
SSubmitBlk* pBlock = (SSubmitBlk*) pTableDataBlock->pData;
|
||||
int32_t rows = htons(pBlock->numOfRows);
|
||||
|
||||
for(int32_t i = 0; i < rows; ++i) {
|
||||
|
@ -1751,16 +1743,8 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
|
|||
}
|
||||
|
||||
int32_t command = pSql->cmd.command;
|
||||
if (pTscObj->pSql == pSql) {
|
||||
/*
|
||||
* in case of taos_connect_a query, the object should all be released, even it is the
|
||||
* master sql object. Otherwise, the master sql should not be released
|
||||
*/
|
||||
if (command == TSDB_SQL_CONNECT && pSql->res.code != TSDB_CODE_SUCCESS) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
if (command == TSDB_SQL_CONNECT) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (command == TSDB_SQL_INSERT) {
|
||||
|
|
|
@ -176,10 +176,10 @@ static void *dnodeProcessReadQueue(void *param) {
|
|||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
}
|
||||
|
||||
dnodeProcessReadResult(pVnode, pReadMsg);
|
||||
// dnodeProcessReadResult(pVnode, pReadMsg);
|
||||
taosFreeQitem(pReadMsg);
|
||||
|
||||
dnodeReleaseVnode(pVnode);
|
||||
dnodeReleaseVnode(pVnode);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -220,7 +220,7 @@ static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
|
|||
code = terrno;
|
||||
}
|
||||
|
||||
//TODO: query handle is returned by dnodeProcessQueryMsg
|
||||
//TODO: query handle is returned by dnodeProcessQueryMsg
|
||||
if (0) {
|
||||
SRpcMsg rsp;
|
||||
rsp.handle = pRead->rpcMsg.handle;
|
||||
|
@ -232,47 +232,67 @@ static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
|
|||
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
|
||||
}
|
||||
|
||||
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
|
||||
|
||||
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
||||
pRead->rpcMsg = pMsg->rpcMsg;
|
||||
pRead->pCont = qhandle;
|
||||
pRead->contLen = 0;
|
||||
pRead->pRpcContext = pMsg->pRpcContext;
|
||||
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
||||
|
||||
taos_queue queue = dnodeGetVnodeRworker(pVnode);
|
||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
||||
|
||||
// SReadMsg readMsg = {
|
||||
// .rpcMsg = {0},
|
||||
// .pCont = qhandle,
|
||||
// .contLen = 0,
|
||||
// .pRpcContext = pMsg->pRpcContext,
|
||||
// };
|
||||
//
|
||||
// taos_queue queue = dnodeGetVnodeRworker(pVnode);
|
||||
// taosWriteQitem(queue, TSDB_MSG_TYPE_QUERY, &readMsg);
|
||||
}
|
||||
|
||||
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
|
||||
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
|
||||
|
||||
SQInfo* pQInfo = NULL;
|
||||
void* tsdb = dnodeGetVnodeTsdb(pVnode);
|
||||
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
|
||||
if (pMsg->contLen != 0) {
|
||||
void* tsdb = dnodeGetVnodeTsdb(pVnode);
|
||||
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, NULL, &pQInfo);
|
||||
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
pRsp->code = code;
|
||||
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->rpcMsg.handle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(SQueryTableRsp),
|
||||
.code = code,
|
||||
.msgType = 0
|
||||
};
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
pRsp->code = code;
|
||||
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->rpcMsg.handle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(SQueryTableRsp),
|
||||
.code = code,
|
||||
.msgType = 0
|
||||
};
|
||||
|
||||
// do execute query
|
||||
qTableQuery(pQInfo);
|
||||
rpcSendResponse(&rpcRsp);
|
||||
} else {
|
||||
pQInfo = pMsg->pCont;
|
||||
}
|
||||
|
||||
qTableQuery(pQInfo); // do execute query
|
||||
}
|
||||
|
||||
static int32_t c = 0;
|
||||
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
|
||||
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
|
||||
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
|
||||
|
||||
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
|
||||
if ((++c)%2 == 0) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
int32_t rowSize = 0;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t contLen = 0;
|
||||
|
||||
SRetrieveTableRsp *pRsp = NULL;
|
||||
|
||||
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
|
||||
int32_t code = qRetrieveQueryResultInfo(pQInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
contLen = sizeof(SRetrieveTableRsp);
|
||||
|
||||
|
@ -281,6 +301,12 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
|
|||
} else {
|
||||
// todo check code and handle error in build result set
|
||||
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
|
||||
|
||||
if (qHasMoreResultsToRetrieve(pQInfo)) {
|
||||
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
|
||||
} else { // no further execution invoked, release the ref to vnode
|
||||
dnodeProcessReadResult(pVnode, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
SRpcMsg rpcRsp = (SRpcMsg) {
|
||||
|
@ -292,7 +318,4 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
|
|||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
//todo merge result should be done here
|
||||
//dnodeProcessReadResult(&readMsg);
|
||||
}
|
||||
|
|
|
@ -68,8 +68,10 @@ typedef struct SWindowResult {
|
|||
} SWindowResult;
|
||||
|
||||
typedef struct SResultRec {
|
||||
int64_t pointsTotal;
|
||||
int64_t pointsRead;
|
||||
int64_t total;
|
||||
int64_t size;
|
||||
int64_t capacity;
|
||||
int32_t threshold; // the threshold size, when the number of rows in result buffer, return to client
|
||||
} SResultRec;
|
||||
|
||||
typedef struct SWindowResInfo {
|
||||
|
@ -112,7 +114,7 @@ typedef struct STableQueryInfo {
|
|||
|
||||
typedef struct STableDataInfo {
|
||||
int32_t numOfBlocks;
|
||||
int32_t start; // start block index
|
||||
int32_t start; // start block index
|
||||
int32_t tableIndex;
|
||||
void* pMeterObj;
|
||||
int32_t groupIdx; // group id in table list
|
||||
|
@ -143,7 +145,6 @@ typedef struct SQuery {
|
|||
int32_t pos;
|
||||
int64_t pointsOffset; // the number of points offset to save read data
|
||||
SData** sdata;
|
||||
int32_t capacity;
|
||||
SSingleColumnFilterInfo* pFilterInfo;
|
||||
} SQuery;
|
||||
|
||||
|
@ -171,15 +172,13 @@ typedef struct SQueryRuntimeEnv {
|
|||
|
||||
typedef struct SQInfo {
|
||||
void* signature;
|
||||
void* pVnode;
|
||||
// void* param; // pointer to the RpcReadMsg
|
||||
TSKEY startTime;
|
||||
TSKEY elapsedTime;
|
||||
SResultRec rec;
|
||||
int32_t pointsInterpo;
|
||||
int32_t code; // error code to returned to client
|
||||
// int32_t killed; // denotes if current query is killed
|
||||
int32_t code; // error code to returned to client
|
||||
sem_t dataReady;
|
||||
SArray* pTableIdList; // table list
|
||||
SArray* pTableIdList; // table id list
|
||||
SQueryRuntimeEnv runtimeEnv;
|
||||
int32_t subgroupIdx;
|
||||
int32_t offset; /* offset in group result set of subgroup */
|
||||
|
@ -204,7 +203,7 @@ typedef struct SQInfo {
|
|||
* @param pQInfo
|
||||
* @return
|
||||
*/
|
||||
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
|
||||
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* param, SQInfo** pQInfo);
|
||||
|
||||
/**
|
||||
* query on single table
|
||||
|
@ -222,7 +221,7 @@ void qSuperTableQuery(void* pReadMsg);
|
|||
* wait for the query completed, and retrieve final results to client
|
||||
* @param pQInfo
|
||||
*/
|
||||
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize);
|
||||
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -232,4 +231,11 @@ int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* ro
|
|||
*/
|
||||
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pQInfo
|
||||
* @return
|
||||
*/
|
||||
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo);
|
||||
|
||||
#endif // TDENGINE_QUERYEXECUTOR_H
|
||||
|
|
|
@ -53,9 +53,9 @@
|
|||
|
||||
/* get the qinfo struct address from the query struct address */
|
||||
#define GET_COLUMN_BYTES(query, colidx) \
|
||||
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.bytes)
|
||||
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.bytes)
|
||||
#define GET_COLUMN_TYPE(query, colidx) \
|
||||
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.type)
|
||||
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.type)
|
||||
|
||||
typedef struct SPointInterpoSupporter {
|
||||
int32_t numOfCols;
|
||||
|
@ -364,8 +364,8 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio
|
|||
bool doRevisedResultsByLimit(SQInfo *pQInfo) {
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
if ((pQuery->limit.limit > 0) && (pQuery->rec.pointsRead + pQInfo->rec.pointsRead > pQuery->limit.limit)) {
|
||||
pQuery->rec.pointsRead = pQuery->limit.limit - pQInfo->rec.pointsRead;
|
||||
if ((pQuery->limit.limit > 0) && (pQuery->rec.size + pQuery->rec.size > pQuery->limit.limit)) {
|
||||
pQuery->rec.size = pQuery->limit.limit - pQuery->rec.size;
|
||||
|
||||
// query completed
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
|
@ -1344,17 +1344,16 @@ static int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int32_t forward
|
|||
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo,
|
||||
SDataStatis *pStatis, __block_search_fn_t searchFn, int32_t *numOfRes,
|
||||
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
||||
*numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
||||
} else {
|
||||
*numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
|
||||
}
|
||||
|
||||
|
||||
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
|
||||
pQuery->lastKey = lastKey + step;
|
||||
pQuery->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
|
||||
|
||||
|
@ -1368,12 +1367,8 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|||
assert(*numOfRes >= 0);
|
||||
|
||||
// check if buffer is large enough for accommodating all qualified points
|
||||
if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1) {
|
||||
pQuery->pointsOffset -= *numOfRes;
|
||||
if (pQuery->pointsOffset <= 0) { // todo return correct numOfRes for ts_comp function
|
||||
pQuery->pointsOffset = 0;
|
||||
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
|
||||
}
|
||||
if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1 && ((*numOfRes) >= pQuery->rec.threshold)) {
|
||||
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -1498,16 +1493,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel
|
|||
SColIndexEx * pColIndexEx = &pSqlFuncMsg->colInfo;
|
||||
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||
|
||||
if (TSDB_COL_IS_TAG(pSqlFuncMsg->colInfo.flag)) { // process tag column info
|
||||
SSchema *pSchema = getColumnModelSchema(pTagsSchema, pColIndexEx->colIdx);
|
||||
|
||||
pCtx->inputType = pSchema->type;
|
||||
pCtx->inputBytes = pSchema->bytes;
|
||||
} else {
|
||||
pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
|
||||
pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
|
||||
}
|
||||
pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
|
||||
pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
|
||||
|
||||
pCtx->ptsOutputBuf = NULL;
|
||||
|
||||
|
@ -1607,7 +1594,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
|
||||
}
|
||||
|
||||
static bool isQueryKilled(SQuery *pQuery) {
|
||||
static bool isQueryKilled(SQInfo *pQInfo) {
|
||||
return (pQInfo->code == TSDB_CODE_QUERY_CANCELLED);
|
||||
#if 0
|
||||
/*
|
||||
* check if the queried meter is going to be deleted.
|
||||
|
@ -1621,8 +1609,6 @@ static bool isQueryKilled(SQuery *pQuery) {
|
|||
|
||||
return (pQInfo->killed == 1);
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool setQueryKilled(SQInfo* pQInfo) {
|
||||
|
@ -1891,8 +1877,6 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
|
|||
|
||||
pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0;
|
||||
}
|
||||
|
||||
// pQuery->pointsOffset = pQuery->pointsToRead;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -2313,7 +2297,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
|
|||
|
||||
pQuery->status = 0;
|
||||
|
||||
pQInfo->rec = (SResultRec){0};
|
||||
pQuery->rec = (SResultRec){0};
|
||||
pQuery->rec = (SResultRec){0};
|
||||
|
||||
changeExecuteScanOrder(pQuery, true);
|
||||
|
@ -2552,7 +2536,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
|
|||
// return DISK_DATA_LOAD_FAILED;
|
||||
}
|
||||
|
||||
if (pStatis == NULL) {
|
||||
if (*pStatis == NULL) {
|
||||
pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL);
|
||||
}
|
||||
} else {
|
||||
|
@ -2651,7 +2635,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
// check if query is killed or not set the status of query to pass the status check
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
return cnt;
|
||||
}
|
||||
|
||||
|
@ -2679,9 +2663,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
}
|
||||
|
||||
int32_t numOfRes = 0;
|
||||
|
||||
SDataStatis *pStatis = NULL;
|
||||
SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
|
||||
|
||||
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
|
||||
int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes,
|
||||
&pRuntimeEnv->windowResInfo, pDataBlock);
|
||||
|
||||
|
@ -3046,9 +3030,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
|||
offset += pData->numOfElems;
|
||||
}
|
||||
|
||||
assert(pQuery->rec.pointsRead == 0);
|
||||
assert(pQuery->rec.size == 0);
|
||||
|
||||
pQuery->rec.pointsRead += rows;
|
||||
pQuery->rec.size += rows;
|
||||
pQInfo->offset += 1;
|
||||
}
|
||||
|
||||
|
@ -3378,7 +3362,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
|
||||
}
|
||||
|
||||
memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->capacity);
|
||||
memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity);
|
||||
}
|
||||
|
||||
initCtxOutputBuf(pRuntimeEnv);
|
||||
|
@ -3425,14 +3409,14 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
|
||||
void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
if (pQuery->rec.pointsRead == 0 || pQuery->limit.offset == 0) {
|
||||
if (pQuery->rec.size == 0 || pQuery->limit.offset == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pQuery->rec.pointsRead <= pQuery->limit.offset) {
|
||||
pQuery->limit.offset -= pQuery->rec.pointsRead;
|
||||
if (pQuery->rec.size <= pQuery->limit.offset) {
|
||||
pQuery->limit.offset -= pQuery->rec.size;
|
||||
|
||||
pQuery->rec.pointsRead = 0;
|
||||
pQuery->rec.size = 0;
|
||||
// pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer
|
||||
|
||||
resetCtxOutputBuf(pRuntimeEnv);
|
||||
|
@ -3441,13 +3425,13 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
pQuery->status &= (~QUERY_RESBUF_FULL);
|
||||
} else {
|
||||
int32_t numOfSkip = (int32_t)pQuery->limit.offset;
|
||||
pQuery->rec.pointsRead -= numOfSkip;
|
||||
pQuery->rec.size -= numOfSkip;
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
||||
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
|
||||
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
||||
assert(0);
|
||||
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes);
|
||||
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->size * bytes);
|
||||
pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip;
|
||||
|
||||
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
||||
|
@ -3617,7 +3601,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
pRuntimeEnv->scanFlag = REPEAT_SCAN;
|
||||
|
||||
/* check if query is killed or not */
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
// setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return;
|
||||
}
|
||||
|
@ -4010,8 +3994,9 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
|
|||
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_ASC;
|
||||
int32_t numOfResult = doCopyToSData(pQInfo, result, orderType);
|
||||
|
||||
pQuery->rec.pointsRead += numOfResult;
|
||||
// assert(pQuery->rec.pointsRead <= pQuery->pointsToRead);
|
||||
pQuery->rec.size += numOfResult;
|
||||
|
||||
assert(pQuery->rec.size <= pQuery->rec.capacity);
|
||||
}
|
||||
|
||||
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo) {
|
||||
|
@ -4049,31 +4034,6 @@ void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo
|
|||
updatelastkey(pQuery, pTableQueryInfo);
|
||||
}
|
||||
|
||||
// we need to split the refstatsult into different packages.
|
||||
int32_t vnodeGetResultSize(void *thandle, int32_t *numOfRows) {
|
||||
SQInfo *pQInfo = (SQInfo *)thandle;
|
||||
SQuery *pQuery = &pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
/*
|
||||
* get the file size and set the numOfRows to be the file size, since for tsComp query,
|
||||
* the returned row size is equalled to 1
|
||||
*
|
||||
* TODO handle the case that the file is too large to send back one time
|
||||
*/
|
||||
if (isTSCompQuery(pQuery) && (*numOfRows) > 0) {
|
||||
struct stat fstat;
|
||||
if (stat(pQuery->sdata[0]->data, &fstat) == 0) {
|
||||
*numOfRows = fstat.st_size;
|
||||
return fstat.st_size;
|
||||
} else {
|
||||
dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno));
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
return pQuery->rowSize * (*numOfRows);
|
||||
}
|
||||
}
|
||||
|
||||
bool vnodeHasRemainResults(void *handle) {
|
||||
SQInfo *pQInfo = (SQInfo *)handle;
|
||||
|
||||
|
@ -4085,7 +4045,7 @@ bool vnodeHasRemainResults(void *handle) {
|
|||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
SInterpolationInfo *pInterpoInfo = &pRuntimeEnv->interpoInfo;
|
||||
if (pQuery->limit.limit > 0 && pQInfo->rec.pointsRead >= pQuery->limit.limit) {
|
||||
if (pQuery->limit.limit > 0 && pQuery->rec.size >= pQuery->limit.limit) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -4158,6 +4118,11 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
|||
memmove(data, pQuery->sdata[col]->data, bytes * numOfRows);
|
||||
data += bytes * numOfRows;
|
||||
}
|
||||
|
||||
// all data returned, set query over
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
setQueryStatus(pQuery, QUERY_OVER);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage **pDataSrc, int32_t numOfRows,
|
||||
|
@ -4266,8 +4231,6 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) {
|
|||
setScanLimitationByResultBuffer(pQuery);
|
||||
changeExecuteScanOrder(pQuery, false);
|
||||
|
||||
pQInfo->rec = (SResultRec){0};
|
||||
|
||||
// dataInCache requires lastKey value
|
||||
pQuery->lastKey = pQuery->window.skey;
|
||||
|
||||
|
@ -4404,14 +4367,10 @@ static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) {
|
|||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
// SMeterObj * pTempMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pMeterSidExtInfo[0]->sid);
|
||||
// __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm];
|
||||
|
||||
// dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles);
|
||||
|
||||
tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle;
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -4550,7 +4509,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
|
|||
|
||||
// accumulate the point interpolation result
|
||||
if (numOfRes > 0) {
|
||||
pQuery->rec.pointsRead += numOfRes;
|
||||
pQuery->rec.size += numOfRes;
|
||||
forwardCtxOutputBuf(pRuntimeEnv, numOfRes);
|
||||
}
|
||||
|
||||
|
@ -4593,7 +4552,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
pSupporter->meterIdx = start;
|
||||
|
||||
for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) {
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return;
|
||||
}
|
||||
|
@ -4620,7 +4579,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
pSupporter->subgroupIdx);
|
||||
|
||||
for (int32_t k = start; k <= end; ++k) {
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return;
|
||||
}
|
||||
|
@ -4638,7 +4597,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
pSupporter->subgroupIdx++;
|
||||
|
||||
// output buffer is full, return to client
|
||||
if (pQuery->pointsRead >= pQuery->pointsToRead) {
|
||||
if (pQuery->size >= pQuery->pointsToRead) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -4654,9 +4613,9 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
*/
|
||||
if (pSupporter->subgroupIdx > 0) {
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
pQInfo->pointsRead += pQuery->pointsRead;
|
||||
pQInfo->size += pQuery->size;
|
||||
|
||||
if (pQuery->pointsRead > 0) {
|
||||
if (pQuery->size > 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -4671,7 +4630,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
while (pSupporter->meterIdx < pSupporter->numOfMeters) {
|
||||
int32_t k = pSupporter->meterIdx;
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return;
|
||||
}
|
||||
|
@ -4722,7 +4681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
|
||||
vnodeScanAllData(pRuntimeEnv);
|
||||
|
||||
pQuery->pointsRead = getNumOfResult(pRuntimeEnv);
|
||||
pQuery->size = getNumOfResult(pRuntimeEnv);
|
||||
doSkipResults(pRuntimeEnv);
|
||||
|
||||
// the limitation of output result is reached, set the query completed
|
||||
|
@ -4757,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
pQuery->skey = pQuery->lastKey;
|
||||
|
||||
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
|
||||
if (pQuery->pointsRead == 0) {
|
||||
if (pQuery->size == 0) {
|
||||
assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
|
||||
continue;
|
||||
} else {
|
||||
|
@ -4804,17 +4763,17 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
pQInfo->pTableQuerySupporter->subgroupIdx = 0;
|
||||
pQuery->pointsRead = 0;
|
||||
pQuery->size = 0;
|
||||
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
|
||||
}
|
||||
|
||||
pQInfo->pointsRead += pQuery->pointsRead;
|
||||
pQInfo->size += pQuery->size;
|
||||
pQuery->pointsOffset = pQuery->pointsToRead;
|
||||
|
||||
dTrace(
|
||||
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
|
||||
"next skey:%" PRId64 ", offset:%" PRId64,
|
||||
pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead,
|
||||
pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size,
|
||||
pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset);
|
||||
#endif
|
||||
}
|
||||
|
@ -4926,13 +4885,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
|
|||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
}
|
||||
|
||||
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
|
||||
pQuery->rec.size += pQuery->rec.size;
|
||||
|
||||
if (pQuery->rec.pointsRead == 0) {
|
||||
if (pQuery->rec.size == 0) {
|
||||
// vnodePrintQueryStatistics(pSupporter);
|
||||
}
|
||||
|
||||
dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.pointsRead, pQInfo->rec.pointsTotal);
|
||||
dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.size, pQuery->rec.total);
|
||||
return;
|
||||
}
|
||||
#if 0
|
||||
|
@ -4965,7 +4924,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
|
|||
|
||||
doMultiMeterSupplementaryScan(pQInfo);
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
dTrace("QInfo:%p query killed, abort", pQInfo);
|
||||
return;
|
||||
}
|
||||
|
@ -4985,8 +4944,8 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
// handle the limitation of output buffer
|
||||
pQInfo->pointsRead += pQuery->pointsRead;
|
||||
dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead,
|
||||
pQInfo->size += pQuery->size;
|
||||
dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->size, pQInfo->size,
|
||||
pQInfo->pointsReturned);
|
||||
#endif
|
||||
}
|
||||
|
@ -5004,13 +4963,13 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
|
|||
vnodeScanAllData(pRuntimeEnv);
|
||||
doFinalizeResult(pRuntimeEnv);
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously.
|
||||
pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv);
|
||||
// assert(pQuery->pointsRead <= pQuery->pointsToRead &&
|
||||
pQuery->rec.size = getNumOfResult(pRuntimeEnv);
|
||||
// assert(pQuery->size <= pQuery->pointsToRead &&
|
||||
// Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED));
|
||||
|
||||
// must be top/bottom query if offset > 0
|
||||
|
@ -5021,15 +4980,12 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
|
|||
doSkipResults(pRuntimeEnv);
|
||||
doRevisedResultsByLimit(pQInfo);
|
||||
|
||||
pQInfo->rec.pointsRead = pQuery->rec.pointsRead;
|
||||
pQuery->rec.size = pQuery->rec.size;
|
||||
}
|
||||
|
||||
static void tableMultiOutputProcessor(SQInfo *pQInfo) {
|
||||
#if 0
|
||||
SQuery * pQuery = &pQInfo->query;
|
||||
SMeterObj *pMeterObj = pQInfo->pObj;
|
||||
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pTableQuerySupporter->runtimeEnv;
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
// for ts_comp query, re-initialized is not allowed
|
||||
if (!isTSCompQuery(pQuery)) {
|
||||
|
@ -5040,63 +4996,52 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) {
|
|||
vnodeScanAllData(pRuntimeEnv);
|
||||
doFinalizeResult(pRuntimeEnv);
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
pQuery->pointsRead = getNumOfResult(pRuntimeEnv);
|
||||
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->pointsRead > 0) {
|
||||
pQuery->rec.size = getNumOfResult(pRuntimeEnv);
|
||||
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.size > 0) {
|
||||
doSkipResults(pRuntimeEnv);
|
||||
}
|
||||
|
||||
/*
|
||||
* 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data
|
||||
* 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0
|
||||
* 1. if pQuery->size == 0, pQuery->limit.offset >= 0, still need to check data
|
||||
* 2. if pQuery->size > 0, pQuery->limit.offset must be 0
|
||||
*/
|
||||
if (pQuery->pointsRead > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
|
||||
if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
break;
|
||||
}
|
||||
|
||||
TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
|
||||
assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)));
|
||||
|
||||
dTrace("QInfo:%p vid:%d sid:%d id:%s, skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64,
|
||||
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->limit.offset, pQuery->lastKey,
|
||||
pQuery->ekey);
|
||||
pQInfo, pQuery->limit.offset, pQuery->lastKey);
|
||||
|
||||
resetCtxOutputBuf(pRuntimeEnv);
|
||||
}
|
||||
|
||||
doRevisedResultsByLimit(pQInfo);
|
||||
pQInfo->pointsRead += pQuery->pointsRead;
|
||||
|
||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
|
||||
TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
|
||||
assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)));
|
||||
|
||||
dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64,
|
||||
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->lastKey, pQuery->ekey);
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||
dTrace("QInfo:%p query paused due to buffer limitation, next qrange:%" PRId64 "-%" PRId64,
|
||||
pQInfo, pQuery->lastKey, pQuery->window.ekey);
|
||||
}
|
||||
|
||||
dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode,
|
||||
pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned);
|
||||
// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode,
|
||||
// pMeterObj->sid, pMeterObj->meterId, pQuery->size, pQInfo->size, pQInfo->pointsReturned);
|
||||
|
||||
pQuery->pointsOffset = pQuery->pointsToRead; // restore the available buffer
|
||||
if (!isTSCompQuery(pQuery)) {
|
||||
assert(pQuery->pointsRead <= pQuery->pointsToRead);
|
||||
}
|
||||
|
||||
#endif
|
||||
// pQuery->pointsOffset = pQuery->pointsToRead; //restore the available buffer
|
||||
// if (!isTSCompQuery(pQuery)) {
|
||||
// assert(pQuery->size <= pQuery->pointsToRead);
|
||||
// }
|
||||
}
|
||||
|
||||
static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
while (1) {
|
||||
initCtxOutputBuf(pRuntimeEnv);
|
||||
vnodeScanAllData(pRuntimeEnv);
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -5115,34 +5060,25 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
pQuery->limit.offset -= c;
|
||||
}
|
||||
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// load the data block for the next retrieve
|
||||
// loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED|QUERY_RESBUF_FULL)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* handle time interval query on single table */
|
||||
// handle time interval query on table
|
||||
static void tableIntervalProcessor(SQInfo *pQInfo) {
|
||||
// STable *pMeterObj = pQInfo->pObj;
|
||||
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
int32_t numOfInterpo = 0;
|
||||
|
||||
while (1) {
|
||||
resetCtxOutputBuf(pRuntimeEnv);
|
||||
vnodeSingleMeterIntervalMainLooper(pRuntimeEnv);
|
||||
tableIntervalProcessImpl(pRuntimeEnv);
|
||||
|
||||
if (pQuery->intervalTime > 0) {
|
||||
pQInfo->subgroupIdx = 0; // always start from 0
|
||||
pQuery->rec.pointsRead = 0;
|
||||
pQuery->rec.size = 0;
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
|
||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
|
||||
|
@ -5153,43 +5089,43 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
|
|||
doRevisedResultsByLimit(pQInfo);
|
||||
break;
|
||||
} else {
|
||||
taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.pointsRead, pQuery->interpoType);
|
||||
taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.size, pQuery->interpoType);
|
||||
SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf;
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
||||
memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.pointsRead * pQuery->pSelectExpr[i].resBytes);
|
||||
memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.size * pQuery->pSelectExpr[i].resBytes);
|
||||
}
|
||||
|
||||
numOfInterpo = 0;
|
||||
pQuery->rec.pointsRead = vnodeQueryResultInterpolate(
|
||||
pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo);
|
||||
pQuery->rec.size = vnodeQueryResultInterpolate(
|
||||
pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.size, &numOfInterpo);
|
||||
|
||||
dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead);
|
||||
if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.size);
|
||||
if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
doRevisedResultsByLimit(pQInfo);
|
||||
break;
|
||||
}
|
||||
|
||||
// no result generated yet, continue retrieve data
|
||||
pQuery->rec.pointsRead = 0;
|
||||
pQuery->rec.size = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// all data scanned, the group by normal column can return
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result
|
||||
pQInfo->subgroupIdx = 0;
|
||||
pQuery->rec.pointsRead = 0;
|
||||
pQuery->rec.size = 0;
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
|
||||
}
|
||||
|
||||
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
|
||||
pQuery->rec.size += pQuery->rec.size;
|
||||
pQInfo->pointsInterpo += numOfInterpo;
|
||||
|
||||
// dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d
|
||||
// totalReturn:%d",
|
||||
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
|
||||
// pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
||||
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, numOfInterpo,
|
||||
// pQInfo->size - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
||||
}
|
||||
|
||||
void qTableQuery(SQInfo *pQInfo) {
|
||||
|
@ -5201,7 +5137,7 @@ void qTableQuery(SQInfo *pQInfo) {
|
|||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
dTrace("QInfo:%p it is already killed, abort", pQInfo);
|
||||
return;
|
||||
}
|
||||
|
@ -5216,16 +5152,16 @@ void qTableQuery(SQInfo *pQInfo) {
|
|||
int32_t numOfInterpo = 0;
|
||||
|
||||
int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo);
|
||||
pQuery->rec.pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata,
|
||||
pQuery->rec.size = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata,
|
||||
(tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo);
|
||||
|
||||
doRevisedResultsByLimit(pQInfo);
|
||||
|
||||
pQInfo->pointsInterpo += numOfInterpo;
|
||||
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
|
||||
pQuery->rec.size += pQuery->rec.size;
|
||||
|
||||
// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
|
||||
// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo,
|
||||
// pQInfo, pQuery->size, numOfInterpo, pQInfo->size, pQInfo->pointsInterpo,
|
||||
// pQInfo->pointsReturned);
|
||||
sem_post(&pQInfo->dataReady);
|
||||
return;
|
||||
|
@ -5235,22 +5171,22 @@ void qTableQuery(SQInfo *pQInfo) {
|
|||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
// continue to get push data from the group result
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) ||
|
||||
(pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) {
|
||||
(pQuery->intervalTime > 0 && pQuery->rec.total < pQuery->limit.limit)) {
|
||||
// todo limit the output for interval query?
|
||||
pQuery->rec.pointsRead = 0;
|
||||
pQuery->rec.size = 0;
|
||||
pQInfo->subgroupIdx = 0; // always start from 0
|
||||
|
||||
if (pRuntimeEnv->windowResInfo.size > 0) {
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
|
||||
pQuery->rec.size += pQuery->rec.size;
|
||||
|
||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
|
||||
|
||||
if (pQuery->rec.pointsRead > 0) {
|
||||
if (pQuery->rec.size > 0) {
|
||||
// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d
|
||||
// totalReturn:%d",
|
||||
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead,
|
||||
// pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
||||
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size,
|
||||
// pQInfo->size, pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
||||
|
||||
sem_post(&pQInfo->dataReady);
|
||||
return;
|
||||
|
@ -5260,7 +5196,7 @@ void qTableQuery(SQInfo *pQInfo) {
|
|||
|
||||
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode,
|
||||
// pMeterObj->sid,
|
||||
// pMeterObj->meterId, pQInfo->pointsRead);
|
||||
// pMeterObj->meterId, pQInfo->size);
|
||||
|
||||
// vnodePrintQueryStatistics(pSupporter);
|
||||
sem_post(&pQInfo->dataReady);
|
||||
|
@ -5268,7 +5204,7 @@ void qTableQuery(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
// number of points returned during this query
|
||||
pQuery->rec.pointsRead = 0;
|
||||
pQuery->rec.size = 0;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
|
@ -5291,10 +5227,10 @@ void qTableQuery(SQInfo *pQInfo) {
|
|||
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
|
||||
|
||||
/* check if query is killed or not */
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
dTrace("QInfo:%p query is killed", pQInfo);
|
||||
} else {
|
||||
dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.pointsRead);
|
||||
dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size);
|
||||
}
|
||||
|
||||
sem_post(&pQInfo->dataReady);
|
||||
|
@ -5317,7 +5253,7 @@ void qSuperTableQuery(void *pReadMsg) {
|
|||
// assert(pQInfo->refCount >= 1);
|
||||
#if 0
|
||||
SQuery *pQuery = &pQInfo->runtimeEnv.pQuery;
|
||||
pQuery->rec.pointsRead = 0;
|
||||
pQuery->rec.size = 0;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
if (pQuery->intervalTime > 0 ||
|
||||
|
@ -5333,15 +5269,15 @@ void qSuperTableQuery(void *pReadMsg) {
|
|||
|
||||
/* record the total elapsed time */
|
||||
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
|
||||
pQuery->status = isQueryKilled(pQuery) ? 1 : 0;
|
||||
pQuery->status = isQueryKilled(pQInfo) ? 1 : 0;
|
||||
|
||||
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->pointsRead,
|
||||
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size,
|
||||
// pQInfo->query.interpoType);
|
||||
|
||||
if (pQuery->rec.pointsRead == 0) {
|
||||
if (pQuery->rec.size == 0) {
|
||||
// pQInfo->over = 1;
|
||||
// dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters,
|
||||
// pQInfo->pointsRead);
|
||||
// pQInfo->size);
|
||||
// vnodePrintQueryStatistics(pSupporter);
|
||||
}
|
||||
|
||||
|
@ -5839,6 +5775,39 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void doUpdateExprColumnIndex(SQuery* pQuery) {
|
||||
assert(pQuery->pSelectExpr != NULL && pQuery != NULL);
|
||||
// int32_t i = 0, j = 0;
|
||||
// while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) {
|
||||
// if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) {
|
||||
// pQuery->colList[i++].colIdx = (int16_t)j++;
|
||||
// } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) {
|
||||
// pQuery->colList[i++].colIdx = -1;
|
||||
// } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) {
|
||||
// j++;
|
||||
// }
|
||||
// }
|
||||
|
||||
// while (i < pQuery->numOfCols) {
|
||||
// pQuery->colList[i++].colIdx = -1; // not such column in current meter
|
||||
// }
|
||||
|
||||
for(int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
||||
SSqlFuncExprMsg* pSqlExprMsg = &pQuery->pSelectExpr[k].pBase;
|
||||
if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SColIndexEx* pColIndexEx = &pSqlExprMsg->colInfo;
|
||||
for(int32_t f = 0; f < pQuery->numOfCols; ++f) {
|
||||
if (pColIndexEx->colId == pQuery->colList[f].info.colId) {
|
||||
pColIndexEx->colIdx = f;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs,
|
||||
SArray *pTableIdList) {
|
||||
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
|
||||
|
@ -5897,6 +5866,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
assert(pExprs[col].resBytes > 0);
|
||||
pQuery->rowSize += pExprs[col].resBytes;
|
||||
}
|
||||
|
||||
doUpdateExprColumnIndex(pQuery);
|
||||
|
||||
int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5910,12 +5881,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
}
|
||||
|
||||
// set the output buffer capacity
|
||||
pQuery->capacity = 4096;
|
||||
pQuery->rec.capacity = 4096;
|
||||
pQuery->rec.threshold = 2;
|
||||
|
||||
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
|
||||
assert(pExprs[col].interResBytes >= pExprs[col].resBytes);
|
||||
|
||||
// allocate additional memory for interResults that are usually larger then final results
|
||||
size_t size = (pQuery->capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData);
|
||||
size_t size = (pQuery->rec.capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData);
|
||||
pQuery->sdata[col] = (SData *)calloc(1, size);
|
||||
if (pQuery->sdata[col] == NULL) {
|
||||
goto _clean_memory;
|
||||
|
@ -5933,13 +5906,12 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
}
|
||||
|
||||
// to make sure third party won't overwrite this structure
|
||||
pQInfo->signature = (uint64_t)pQInfo;
|
||||
pQInfo->signature = pQInfo;
|
||||
pQInfo->pTableIdList = pTableIdList;
|
||||
|
||||
pQuery->pos = -1;
|
||||
// dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
|
||||
// pQInfo);
|
||||
|
||||
|
||||
dTrace("QInfo %p is allocated", pQInfo);
|
||||
return pQInfo;
|
||||
|
||||
_clean_memory:
|
||||
|
@ -6092,7 +6064,7 @@ _error:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
|
||||
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param, SQInfo **pQInfo) {
|
||||
assert(pQueryTableMsg != NULL);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -6130,6 +6102,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ
|
|||
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
|
||||
} else {
|
||||
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
|
||||
// (*pQInfo)->param = param;
|
||||
}
|
||||
|
||||
_query_over:
|
||||
|
@ -6155,13 +6128,13 @@ _query_over:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *rowsize) {
|
||||
int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
|
||||
if (pQInfo == NULL || !isQInfoValid(pQInfo)) {
|
||||
return TSDB_CODE_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
|
||||
if (pQInfo->code == TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_QUERY_CANCELLED;
|
||||
|
@ -6171,11 +6144,8 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro
|
|||
}
|
||||
|
||||
sem_wait(&pQInfo->dataReady);
|
||||
|
||||
*numOfRows = pQInfo->rec.pointsRead;
|
||||
*rowsize = pQuery->rowSize;
|
||||
|
||||
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code);
|
||||
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size,
|
||||
pQInfo->code);
|
||||
|
||||
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
|
||||
}
|
||||
|
@ -6202,7 +6172,7 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) {
|
||||
static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
||||
// the remained number of retrieved rows, not the interpolated result
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
|
@ -6225,28 +6195,31 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) {
|
|||
pQuery->sdata[0]->data, strerror(errno));
|
||||
}
|
||||
} else {
|
||||
doCopyQueryResultToMsg(pQInfo, pQInfo->rec.pointsRead, data);
|
||||
doCopyQueryResultToMsg(pQInfo, pQuery->rec.size, data);
|
||||
}
|
||||
|
||||
pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead;
|
||||
dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal);
|
||||
pQuery->rec.total += pQuery->rec.size;
|
||||
dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total);
|
||||
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
// todo if interpolation exists, the result may be dump to client by several rounds
|
||||
}
|
||||
|
||||
static void addToTaskQueue(SQInfo* pQInfo) {
|
||||
// no error occurred, continue retrieving data
|
||||
if (pQInfo->code == TSDB_CODE_SUCCESS) {
|
||||
#ifdef _TD_ARM_
|
||||
dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:doDumpQueryResult", pQInfo, pQInfo->signature);
|
||||
#else
|
||||
dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
|
||||
#endif
|
||||
|
||||
// todo add to task queue
|
||||
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
|
||||
if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
|
||||
return false;
|
||||
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||
return true;
|
||||
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
return true;
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6256,13 +6229,12 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
|
|||
}
|
||||
|
||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
size_t size = getResultSize(pQInfo, &pQInfo->rec.pointsRead);
|
||||
size_t size = getResultSize(pQInfo, &pQuery->rec.size);
|
||||
*contLen = size + sizeof(SRetrieveTableRsp);
|
||||
|
||||
// todo handle failed to allocate memory
|
||||
*pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen);
|
||||
|
||||
(*pRsp)->numOfRows = htonl(pQInfo->rec.pointsRead);
|
||||
(*pRsp)->numOfRows = htonl(pQuery->rec.size);
|
||||
|
||||
int32_t code = pQInfo->code;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -6273,16 +6245,13 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
|
|||
(*pRsp)->useconds = 0;
|
||||
}
|
||||
|
||||
if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) {
|
||||
code = doDumpQueryResult(pQInfo, (*pRsp)->data, NULL);
|
||||
|
||||
// has more data to return or need next round to execute
|
||||
addToTaskQueue(pQInfo);
|
||||
} else if (isQueryKilled(pQuery)) {
|
||||
code = TSDB_CODE_QUERY_CANCELLED;
|
||||
if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) {
|
||||
code = doDumpQueryResult(pQInfo, (*pRsp)->data);
|
||||
} else {
|
||||
code = pQInfo->code;
|
||||
}
|
||||
|
||||
if (isQueryKilled(pQuery) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
|
||||
(*pRsp)->completed = 1; // notify no more result to client
|
||||
vnodeFreeQInfo(pQInfo);
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
#include "rpcServer.h"
|
||||
#include "rpcHead.h"
|
||||
#include "trpc.h"
|
||||
#include "hash.h"
|
||||
|
||||
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
|
||||
#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead)))
|
||||
|
@ -258,7 +259,8 @@ void *rpcOpen(SRpcInit *pInit) {
|
|||
}
|
||||
|
||||
if (pRpc->connType == TAOS_CONN_SERVER) {
|
||||
pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString);
|
||||
// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString);
|
||||
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
|
||||
if (pRpc->hash == NULL) {
|
||||
tError("%s failed to init string hash", pRpc->label);
|
||||
rpcClose(pRpc);
|
||||
|
@ -292,7 +294,8 @@ void rpcClose(void *param) {
|
|||
}
|
||||
}
|
||||
|
||||
taosCleanUpStrHash(pRpc->hash);
|
||||
// taosCleanUpStrHash(pRpc->hash);
|
||||
taosHashCleanup(pRpc->hash);
|
||||
taosTmrCleanUp(pRpc->tmrCtrl);
|
||||
taosIdPoolCleanUp(pRpc->idPool);
|
||||
rpcCloseConnCache(pRpc->pCache);
|
||||
|
@ -507,8 +510,10 @@ static void rpcCloseConn(void *thandle) {
|
|||
|
||||
if ( pRpc->connType == TAOS_CONN_SERVER) {
|
||||
char hashstr[40] = {0};
|
||||
sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
|
||||
taosDeleteStrHash(pRpc->hash, hashstr);
|
||||
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
|
||||
// taosDeleteStrHash(pRpc->hash, hashstr);
|
||||
// taosHashRemove(pRpc->hash, hashstr, size);
|
||||
|
||||
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
|
||||
pConn->pRspMsg = NULL;
|
||||
pConn->inType = 0;
|
||||
|
@ -556,10 +561,11 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
|||
char hashstr[40] = {0};
|
||||
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
|
||||
|
||||
sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
|
||||
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
|
||||
|
||||
// check if it is already allocated
|
||||
SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
|
||||
// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
|
||||
SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
|
||||
if (ppConn) pConn = *ppConn;
|
||||
if (pConn) return pConn;
|
||||
|
||||
|
@ -591,7 +597,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
|||
pConn->localPort = (pRpc->localPort + pRpc->index);
|
||||
}
|
||||
|
||||
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
|
||||
// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
|
||||
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
|
||||
|
||||
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
|
||||
pRpc->label, pConn, sid, pConn->user, pConn->localPort);
|
||||
}
|
||||
|
|
|
@ -5312,7 +5312,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
|
||||
while (1) {
|
||||
// check if query is killed or not set the status of query to pass the status check
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return cnt;
|
||||
}
|
||||
|
@ -6375,7 +6375,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
pRuntimeEnv->scanFlag = REPEAT_SCAN;
|
||||
|
||||
/* check if query is killed or not */
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
|
|||
int32_t start = pSupporter->pSidSet->starterPos[groupIdx];
|
||||
int32_t end = pSupporter->pSidSet->starterPos[groupIdx + 1] - 1;
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -276,7 +276,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
|
|||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
while (1) {
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -363,7 +363,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
|
|||
int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1;
|
||||
|
||||
for (; j < numOfBlocks && j >= 0; j += step) {
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -603,7 +603,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
pSupporter->meterIdx = start;
|
||||
|
||||
for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) {
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return;
|
||||
}
|
||||
|
@ -630,7 +630,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
pSupporter->subgroupIdx);
|
||||
|
||||
for (int32_t k = start; k <= end; ++k) {
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return;
|
||||
}
|
||||
|
@ -681,7 +681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
|
|||
while (pSupporter->meterIdx < pSupporter->numOfMeters) {
|
||||
int32_t k = pSupporter->meterIdx;
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return;
|
||||
}
|
||||
|
@ -958,7 +958,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
|
|||
|
||||
doMultiMeterSupplementaryScan(pQInfo);
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
dTrace("QInfo:%p query killed, abort", pQInfo);
|
||||
return;
|
||||
}
|
||||
|
@ -998,7 +998,7 @@ static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) {
|
|||
vnodeScanAllData(pRuntimeEnv);
|
||||
doFinalizeResult(pRuntimeEnv);
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1033,7 +1033,7 @@ static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) {
|
|||
vnodeScanAllData(pRuntimeEnv);
|
||||
doFinalizeResult(pRuntimeEnv);
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1087,7 +1087,7 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
|
|||
initCtxOutputBuf(pRuntimeEnv);
|
||||
vnodeScanAllData(pRuntimeEnv);
|
||||
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1301,7 +1301,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
|
|||
pQInfo->useconds += (taosGetTimestampUs() - st);
|
||||
|
||||
/* check if query is killed or not */
|
||||
if (isQueryKilled(pQuery)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
dTrace("QInfo:%p query is killed", pQInfo);
|
||||
pQInfo->over = 1;
|
||||
} else {
|
||||
|
@ -1345,7 +1345,7 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
|
|||
|
||||
/* record the total elapsed time */
|
||||
pQInfo->useconds += (taosGetTimestampUs() - st);
|
||||
pQInfo->over = isQueryKilled(pQuery) ? 1 : 0;
|
||||
pQInfo->over = isQueryKilled(pQInfo) ? 1 : 0;
|
||||
|
||||
taosInterpoSetStartInfo(&pQInfo->pTableQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead,
|
||||
pQInfo->query.interpoType);
|
||||
|
|
|
@ -124,6 +124,7 @@ typedef struct STsdbQueryHandle {
|
|||
int32_t tableIndex;
|
||||
bool isFirstSlot;
|
||||
void * qinfo; // query info handle, for debug purpose
|
||||
SSkipListIterator* memIter;
|
||||
} STsdbQueryHandle;
|
||||
|
||||
int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) {
|
||||
|
@ -335,7 +336,6 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
|
|||
|
||||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
if (dataRowKey(row) > maxKey) break;
|
||||
// Convert row data to column data
|
||||
|
||||
if (*skey == INT64_MIN) {
|
||||
*skey = dataRowKey(row);
|
||||
|
@ -345,13 +345,13 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
|
|||
|
||||
int32_t offset = 0;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, 0);
|
||||
SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, i);
|
||||
memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
|
||||
offset += pColInfo->info.bytes;
|
||||
}
|
||||
|
||||
numOfRows++;
|
||||
if (numOfRows > maxRowsToRead) break;
|
||||
if (numOfRows >= maxRowsToRead) break;
|
||||
};
|
||||
|
||||
return numOfRows;
|
||||
|
@ -368,8 +368,13 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
|
|||
int32_t rows = 0;
|
||||
|
||||
if (pTable->mem != NULL) {
|
||||
SSkipListIterator* iter = tSkipListCreateIter(pTable->mem->pData);
|
||||
rows = tsdbReadRowsFromCache(iter, INT64_MAX, 4000, &skey, &ekey, pHandle);
|
||||
|
||||
// create mem table iterator if it is not created yet
|
||||
if (pHandle->memIter == NULL) {
|
||||
pHandle->memIter = tSkipListCreateIter(pTable->mem->pData);
|
||||
}
|
||||
|
||||
rows = tsdbReadRowsFromCache(pHandle->memIter, INT64_MAX, 2, &skey, &ekey, pHandle);
|
||||
}
|
||||
|
||||
SDataBlockInfo blockInfo = {
|
||||
|
@ -392,7 +397,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SData
|
|||
}
|
||||
|
||||
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
|
||||
|
||||
// in case of data in cache, all data has been kept in column info object.
|
||||
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
|
||||
return pHandle->pColumns;
|
||||
}
|
||||
|
||||
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {}
|
||||
|
|
|
@ -64,6 +64,9 @@ int main(int argc, char *argv[]) {
|
|||
memset(buf, 0, 512);
|
||||
}
|
||||
|
||||
taos_close(taos);
|
||||
|
||||
getchar();
|
||||
return 0;
|
||||
|
||||
taos_query(taos, "drop database demo");
|
||||
|
|
Loading…
Reference in New Issue