[td-32] fix memory leaks, and fix bugs in select * query.
This commit is contained in:
parent
b4e4e5a50b
commit
c2a3c50f07
|
@ -1238,22 +1238,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
||||||
goto _clean;
|
goto _clean;
|
||||||
}
|
}
|
||||||
|
|
||||||
// submit to more than one vnode
|
|
||||||
if (pCmd->pDataBlocks->nSize > 0) {
|
if (pCmd->pDataBlocks->nSize > 0) {
|
||||||
// merge according to vgId
|
// merge according to vgId
|
||||||
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
|
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
|
||||||
goto _error_clean;
|
goto _error_clean;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
|
|
||||||
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error_clean;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
|
||||||
|
|
||||||
// set the next sent data vnode index in data block arraylist
|
|
||||||
pTableMetaInfo->vnodeIndex = 1;
|
|
||||||
} else {
|
} else {
|
||||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1489,11 +1489,9 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
|
||||||
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes);
|
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes);
|
||||||
strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName));
|
strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName));
|
||||||
|
|
||||||
// for point interpolation/last_row query, we need the timestamp column to be loaded
|
// for all querie, the timestamp column meeds to be loaded
|
||||||
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||||
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) {
|
|
||||||
tscColumnBaseInfoInsert(pQueryInfo, &index);
|
tscColumnBaseInfoInsert(pQueryInfo, &index);
|
||||||
}
|
|
||||||
|
|
||||||
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
|
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
|
||||||
insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr);
|
insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr);
|
||||||
|
@ -1582,6 +1580,9 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||||
|
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
case TK_SUM:
|
case TK_SUM:
|
||||||
|
@ -1690,6 +1691,9 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||||
|
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
case TK_FIRST:
|
case TK_FIRST:
|
||||||
|
@ -1708,7 +1712,6 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
||||||
}
|
}
|
||||||
|
|
||||||
/* in first/last function, multiple columns can be add to resultset */
|
/* in first/last function, multiple columns can be add to resultset */
|
||||||
|
|
||||||
for (int32_t i = 0; i < pItem->pNode->pParam->nExpr; ++i) {
|
for (int32_t i = 0; i < pItem->pNode->pParam->nExpr; ++i) {
|
||||||
tSQLExprItem* pParamElem = &(pItem->pNode->pParam->a[i]);
|
tSQLExprItem* pParamElem = &(pItem->pNode->pParam->a[i]);
|
||||||
if (pParamElem->pNode->nSQLOptr != TK_ALL && pParamElem->pNode->nSQLOptr != TK_ID) {
|
if (pParamElem->pNode->nSQLOptr != TK_ALL && pParamElem->pNode->nSQLOptr != TK_ID) {
|
||||||
|
@ -1773,6 +1776,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
||||||
numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1891,6 +1895,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
|
||||||
default:
|
default:
|
||||||
return TSDB_CODE_INVALID_SQL;
|
return TSDB_CODE_INVALID_SQL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
|
|
|
@ -341,16 +341,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
||||||
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
|
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
|
||||||
|
|
||||||
if (shouldFree) {
|
if (shouldFree) {
|
||||||
// If it is failed, all objects allocated during execution taos_connect_a should be released
|
|
||||||
if (command == TSDB_SQL_CONNECT) {
|
|
||||||
taos_close(pObj);
|
|
||||||
tscTrace("%p Async sql close failed connection", pSql);
|
|
||||||
} else {
|
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
tscTrace("%p Async sql is automatically freed", pSql);
|
tscTrace("%p Async sql is automatically freed", pSql);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
rpcFreeCont(rpcMsg->pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
|
@ -594,11 +594,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfTableHasRes >= 2) { // do merge result
|
if (numOfTableHasRes >= 2) { // do merge result
|
||||||
|
|
||||||
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL);
|
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL);
|
||||||
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
|
|
||||||
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
|
|
||||||
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
|
|
||||||
} else { // only one subquery
|
} else { // only one subquery
|
||||||
SSqlObj *pSub = pSql->pSubs[0];
|
SSqlObj *pSub = pSql->pSubs[0];
|
||||||
if (pSub == NULL) {
|
if (pSub == NULL) {
|
||||||
|
@ -674,14 +670,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
|
|
||||||
if (pRes->qhandle == 0 ||
|
if (pRes->qhandle == 0 ||
|
||||||
pRes->completed ||
|
|
||||||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||||
pCmd->command == TSDB_SQL_INSERT) {
|
pCmd->command == TSDB_SQL_INSERT) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// current data are exhausted, fetch more data
|
// current data are exhausted, fetch more data
|
||||||
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows &&
|
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
|
||||||
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
|
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
|
||||||
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
|
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
|
||||||
|
|
||||||
|
|
|
@ -504,7 +504,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_init(&pSql->rspSem, 0, 0);
|
tsem_init(&pSql->rspSem, 0, 0);
|
||||||
tsem_init(&pSql->emptyRspSem, 0, 1);
|
|
||||||
|
|
||||||
SSqlInfo SQLInfo = {0};
|
SSqlInfo SQLInfo = {0};
|
||||||
tSQLParse(&SQLInfo, pSql->sqlstr);
|
tSQLParse(&SQLInfo, pSql->sqlstr);
|
||||||
|
|
|
@ -423,9 +423,6 @@ void tscFreeResData(SSqlObj* pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscFreeSqlResult(SSqlObj* pSql) {
|
void tscFreeSqlResult(SSqlObj* pSql) {
|
||||||
//TODO not free
|
|
||||||
return;
|
|
||||||
|
|
||||||
tfree(pSql->res.pRsp);
|
tfree(pSql->res.pRsp);
|
||||||
pSql->res.row = 0;
|
pSql->res.row = 0;
|
||||||
pSql->res.numOfRows = 0;
|
pSql->res.numOfRows = 0;
|
||||||
|
@ -469,8 +466,6 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
|
||||||
tscFreeSqlCmdData(pCmd);
|
tscFreeSqlCmdData(pCmd);
|
||||||
|
|
||||||
tscTrace("%p free sqlObj partial completed", pSql);
|
tscTrace("%p free sqlObj partial completed", pSql);
|
||||||
|
|
||||||
tscFreeSqlCmdData(pCmd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscFreeSqlObj(SSqlObj* pSql) {
|
void tscFreeSqlObj(SSqlObj* pSql) {
|
||||||
|
@ -489,10 +484,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
|
||||||
|
|
||||||
pCmd->allocSize = 0;
|
pCmd->allocSize = 0;
|
||||||
|
|
||||||
if (pSql->fp == NULL) {
|
|
||||||
tsem_destroy(&pSql->rspSem);
|
tsem_destroy(&pSql->rspSem);
|
||||||
tsem_destroy(&pSql->emptyRspSem);
|
|
||||||
}
|
|
||||||
free(pSql);
|
free(pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1751,18 +1743,10 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t command = pSql->cmd.command;
|
int32_t command = pSql->cmd.command;
|
||||||
if (pTscObj->pSql == pSql) {
|
if (command == TSDB_SQL_CONNECT) {
|
||||||
/*
|
|
||||||
* in case of taos_connect_a query, the object should all be released, even it is the
|
|
||||||
* master sql object. Otherwise, the master sql should not be released
|
|
||||||
*/
|
|
||||||
if (command == TSDB_SQL_CONNECT && pSql->res.code != TSDB_CODE_SUCCESS) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (command == TSDB_SQL_INSERT) {
|
if (command == TSDB_SQL_INSERT) {
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
|
|
|
@ -53,9 +53,9 @@
|
||||||
|
|
||||||
/* get the qinfo struct address from the query struct address */
|
/* get the qinfo struct address from the query struct address */
|
||||||
#define GET_COLUMN_BYTES(query, colidx) \
|
#define GET_COLUMN_BYTES(query, colidx) \
|
||||||
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.bytes)
|
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.bytes)
|
||||||
#define GET_COLUMN_TYPE(query, colidx) \
|
#define GET_COLUMN_TYPE(query, colidx) \
|
||||||
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.type)
|
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.type)
|
||||||
|
|
||||||
typedef struct SPointInterpoSupporter {
|
typedef struct SPointInterpoSupporter {
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
|
@ -1498,16 +1498,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel
|
||||||
SColIndexEx * pColIndexEx = &pSqlFuncMsg->colInfo;
|
SColIndexEx * pColIndexEx = &pSqlFuncMsg->colInfo;
|
||||||
|
|
||||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||||
|
|
||||||
if (TSDB_COL_IS_TAG(pSqlFuncMsg->colInfo.flag)) { // process tag column info
|
|
||||||
SSchema *pSchema = getColumnModelSchema(pTagsSchema, pColIndexEx->colIdx);
|
|
||||||
|
|
||||||
pCtx->inputType = pSchema->type;
|
|
||||||
pCtx->inputBytes = pSchema->bytes;
|
|
||||||
} else {
|
|
||||||
pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
|
pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
|
||||||
pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
|
pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
|
||||||
}
|
|
||||||
|
|
||||||
pCtx->ptsOutputBuf = NULL;
|
pCtx->ptsOutputBuf = NULL;
|
||||||
|
|
||||||
|
@ -1891,8 +1883,6 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
|
||||||
|
|
||||||
pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0;
|
pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pQuery->pointsOffset = pQuery->pointsToRead;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2552,7 +2542,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
|
||||||
// return DISK_DATA_LOAD_FAILED;
|
// return DISK_DATA_LOAD_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStatis == NULL) {
|
if (*pStatis == NULL) {
|
||||||
pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL);
|
pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -5025,11 +5015,8 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tableMultiOutputProcessor(SQInfo *pQInfo) {
|
static void tableMultiOutputProcessor(SQInfo *pQInfo) {
|
||||||
#if 0
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery * pQuery = &pQInfo->query;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
SMeterObj *pMeterObj = pQInfo->pObj;
|
|
||||||
|
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pTableQuerySupporter->runtimeEnv;
|
|
||||||
|
|
||||||
// for ts_comp query, re-initialized is not allowed
|
// for ts_comp query, re-initialized is not allowed
|
||||||
if (!isTSCompQuery(pQuery)) {
|
if (!isTSCompQuery(pQuery)) {
|
||||||
|
@ -5044,8 +5031,8 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->pointsRead = getNumOfResult(pRuntimeEnv);
|
pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv);
|
||||||
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->pointsRead > 0) {
|
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.pointsRead > 0) {
|
||||||
doSkipResults(pRuntimeEnv);
|
doSkipResults(pRuntimeEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5053,40 +5040,31 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) {
|
||||||
* 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data
|
* 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data
|
||||||
* 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0
|
* 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0
|
||||||
*/
|
*/
|
||||||
if (pQuery->pointsRead > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
|
if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
|
|
||||||
assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)));
|
|
||||||
|
|
||||||
dTrace("QInfo:%p vid:%d sid:%d id:%s, skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64,
|
dTrace("QInfo:%p vid:%d sid:%d id:%s, skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64,
|
||||||
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->limit.offset, pQuery->lastKey,
|
pQInfo, pQuery->limit.offset, pQuery->lastKey);
|
||||||
pQuery->ekey);
|
|
||||||
|
|
||||||
resetCtxOutputBuf(pRuntimeEnv);
|
resetCtxOutputBuf(pRuntimeEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
doRevisedResultsByLimit(pQInfo);
|
doRevisedResultsByLimit(pQInfo);
|
||||||
pQInfo->pointsRead += pQuery->pointsRead;
|
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
|
||||||
|
|
||||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||||
TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
|
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64,
|
||||||
assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)));
|
// pQInfo, pQuery->lastKey, pQuery->ekey);
|
||||||
|
|
||||||
dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64,
|
|
||||||
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->lastKey, pQuery->ekey);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode,
|
// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode,
|
||||||
pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned);
|
// pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned);
|
||||||
|
|
||||||
pQuery->pointsOffset = pQuery->pointsToRead; // restore the available buffer
|
// pQuery->pointsOffset = pQuery->pointsToRead; //restore the available buffer
|
||||||
if (!isTSCompQuery(pQuery)) {
|
// if (!isTSCompQuery(pQuery)) {
|
||||||
assert(pQuery->pointsRead <= pQuery->pointsToRead);
|
// assert(pQuery->pointsRead <= pQuery->pointsToRead);
|
||||||
}
|
// }
|
||||||
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
|
static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
@ -5127,10 +5105,8 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* handle time interval query on single table */
|
// handle time interval query on table
|
||||||
static void tableIntervalProcessor(SQInfo *pQInfo) {
|
static void tableIntervalProcessor(SQInfo *pQInfo) {
|
||||||
// STable *pMeterObj = pQInfo->pObj;
|
|
||||||
|
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
|
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
@ -5839,6 +5815,39 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doUpdateExprColumnIndex(SQuery* pQuery) {
|
||||||
|
assert(pQuery->pSelectExpr != NULL && pQuery != NULL);
|
||||||
|
// int32_t i = 0, j = 0;
|
||||||
|
// while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) {
|
||||||
|
// if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) {
|
||||||
|
// pQuery->colList[i++].colIdx = (int16_t)j++;
|
||||||
|
// } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) {
|
||||||
|
// pQuery->colList[i++].colIdx = -1;
|
||||||
|
// } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) {
|
||||||
|
// j++;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// while (i < pQuery->numOfCols) {
|
||||||
|
// pQuery->colList[i++].colIdx = -1; // not such column in current meter
|
||||||
|
// }
|
||||||
|
|
||||||
|
for(int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
||||||
|
SSqlFuncExprMsg* pSqlExprMsg = &pQuery->pSelectExpr[k].pBase;
|
||||||
|
if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColIndexEx* pColIndexEx = &pSqlExprMsg->colInfo;
|
||||||
|
for(int32_t f = 0; f < pQuery->numOfCols; ++f) {
|
||||||
|
if (pColIndexEx->colId == pQuery->colList[f].info.colId) {
|
||||||
|
pColIndexEx->colIdx = f;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs,
|
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs,
|
||||||
SArray *pTableIdList) {
|
SArray *pTableIdList) {
|
||||||
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
|
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
|
||||||
|
@ -5898,6 +5907,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
||||||
pQuery->rowSize += pExprs[col].resBytes;
|
pQuery->rowSize += pExprs[col].resBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
doUpdateExprColumnIndex(pQuery);
|
||||||
|
|
||||||
int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery);
|
int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
goto _clean_memory;
|
goto _clean_memory;
|
||||||
|
@ -5933,7 +5944,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
||||||
}
|
}
|
||||||
|
|
||||||
// to make sure third party won't overwrite this structure
|
// to make sure third party won't overwrite this structure
|
||||||
pQInfo->signature = (uint64_t)pQInfo;
|
pQInfo->signature = pQInfo;
|
||||||
pQInfo->pTableIdList = pTableIdList;
|
pQInfo->pTableIdList = pTableIdList;
|
||||||
|
|
||||||
pQuery->pos = -1;
|
pQuery->pos = -1;
|
||||||
|
|
|
@ -335,7 +335,6 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
if (dataRowKey(row) > maxKey) break;
|
if (dataRowKey(row) > maxKey) break;
|
||||||
// Convert row data to column data
|
|
||||||
|
|
||||||
if (*skey == INT64_MIN) {
|
if (*skey == INT64_MIN) {
|
||||||
*skey = dataRowKey(row);
|
*skey = dataRowKey(row);
|
||||||
|
@ -345,13 +344,13 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
|
||||||
|
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, 0);
|
SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, i);
|
||||||
memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
|
memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
|
||||||
offset += pColInfo->info.bytes;
|
offset += pColInfo->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
if (numOfRows > maxRowsToRead) break;
|
if (numOfRows >= maxRowsToRead) break;
|
||||||
};
|
};
|
||||||
|
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
|
@ -392,7 +391,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SData
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
|
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
|
||||||
|
// in case of data in cache, all data has been kept in column info object.
|
||||||
|
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
|
||||||
|
return pHandle->pColumns;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {}
|
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {}
|
||||||
|
|
Loading…
Reference in New Issue