refactor the allocation buffer for load data block into memory
This commit is contained in:
parent
c2e91fcb75
commit
869171d233
|
@ -266,7 +266,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
} else if (functionId == TSDB_FUNC_PERCT) {
|
||||
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = (int16_t)sizeof(double);
|
||||
*intermediateResBytes = POINTER_BYTES;
|
||||
//*intermediateResBytes = POINTER_BYTES;
|
||||
*intermediateResBytes = (int16_t)sizeof(double);
|
||||
} else if (functionId == TSDB_FUNC_LEASTSQR) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string
|
||||
|
|
|
@ -262,15 +262,15 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
|
|||
|
||||
int taos_query(TAOS *taos, const char *sqlstr) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
SSqlObj *pSql = pObj->pSql;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
if (pObj == NULL || pObj->signature != pObj) {
|
||||
globalCode = TSDB_CODE_DISCONNECTED;
|
||||
return TSDB_CODE_DISCONNECTED;
|
||||
}
|
||||
|
||||
int32_t sqlLen = strlen(sqlstr);
|
||||
SSqlObj *pSql = pObj->pSql;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
size_t sqlLen = strlen(sqlstr);
|
||||
if (sqlLen > TSDB_MAX_SQL_LEN) {
|
||||
tscError("%p sql too long", pSql);
|
||||
pRes->code = TSDB_CODE_INVALID_SQL;
|
||||
|
|
|
@ -122,7 +122,6 @@ typedef struct RuntimeEnvironment {
|
|||
SQuery* pQuery;
|
||||
SMeterObj* pMeterObj;
|
||||
SQLFunctionCtx* pCtx;
|
||||
char* buffer; /* column data load buffer, colDataBuffer is point to this value */
|
||||
SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
|
||||
SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
|
||||
|
||||
|
|
|
@ -1522,7 +1522,6 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
|
|||
int lastPos = -1, startPos;
|
||||
int col, step, code = 0;
|
||||
char * pRead, *pData;
|
||||
char * buffer;
|
||||
SData * sdata[TSDB_MAX_COLUMNS];
|
||||
SCompBlock *pBlock = NULL;
|
||||
SVnodeObj * pVnode = &vnodeList[pObj->vnode];
|
||||
|
@ -1556,12 +1555,10 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
|
|||
|
||||
if (pQuery->over) return 0;
|
||||
|
||||
// allocate memory more efficiently
|
||||
buffer = calloc(1, pQuery->dataRowSize * pBlock->numOfPoints + sizeof(SData) * pQuery->numOfCols);
|
||||
sdata[0] = (SData *)buffer;
|
||||
for (col = 1; col < pQuery->numOfCols; ++col)
|
||||
sdata[col] =
|
||||
(SData *)(((char *)sdata[col - 1]) + sizeof(SData) + pBlock->numOfPoints * pQuery->colList[col - 1].data.bytes);
|
||||
// To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system.
|
||||
for(col = 0; col < pQuery->numOfCols; ++col) {
|
||||
sdata[col] = calloc(1, sizeof(SData) + pBlock->numOfPoints * pQuery->colList[col].data.bytes + EXTRA_BYTES);
|
||||
}
|
||||
|
||||
/*
|
||||
* timestamp column is fetched in any cases. Therefore, if the query does not fetch primary column,
|
||||
|
@ -1768,7 +1765,9 @@ _next:
|
|||
pQuery->slot = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pQuery->numOfBlocks - 1;
|
||||
}
|
||||
|
||||
tfree(buffer);
|
||||
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||
tfree(sdata[i]);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -1931,19 +1931,13 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery
|
|||
// for loading block data in memory
|
||||
assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock);
|
||||
|
||||
size_t loadDataBlockBufferSize =
|
||||
pQuery->dataRowSize * (pMeterObj->pointsPerFileBlock) + (sizeof(SData) + EXTRA_BYTES) * pQuery->numOfCols;
|
||||
pRuntimeEnv->buffer = (char *)malloc(loadDataBlockBufferSize);
|
||||
|
||||
if (pRuntimeEnv->buffer == NULL) {
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
pRuntimeEnv->colDataBuffer[0] = (SData *)pRuntimeEnv->buffer;
|
||||
for (int32_t i = 1; i < pQuery->numOfCols; ++i) {
|
||||
int32_t bytes = pQuery->colList[i - 1].data.bytes;
|
||||
pRuntimeEnv->colDataBuffer[i] = (SData *)(((void *)pRuntimeEnv->colDataBuffer[i - 1]) + sizeof(SData) +
|
||||
EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes);
|
||||
// To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system.
|
||||
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||
int32_t bytes = pQuery->colList[i].data.bytes;
|
||||
pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes);
|
||||
if (pRuntimeEnv->colDataBuffer[i] == NULL) {
|
||||
goto _error_clean;
|
||||
}
|
||||
}
|
||||
|
||||
// record the maximum column width among columns of this meter/metric
|
||||
|
@ -1978,7 +1972,11 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery
|
|||
_error_clean:
|
||||
tfree(pRuntimeEnv->resultInfo);
|
||||
tfree(pRuntimeEnv->pCtx);
|
||||
tfree(pRuntimeEnv->buffer);
|
||||
|
||||
for(int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
|
||||
tfree(pRuntimeEnv->colDataBuffer[i]);
|
||||
}
|
||||
|
||||
tfree(pRuntimeEnv->unzipBuffer);
|
||||
tfree(pRuntimeEnv->secondaryUnzipBuffer);
|
||||
|
||||
|
@ -1995,7 +1993,10 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
}
|
||||
|
||||
dTrace("QInfo:%p teardown runtime env", GET_QINFO_ADDR(pRuntimeEnv->pQuery));
|
||||
tfree(pRuntimeEnv->buffer);
|
||||
for(int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
|
||||
tfree(pRuntimeEnv->colDataBuffer[i]);
|
||||
}
|
||||
|
||||
tfree(pRuntimeEnv->secondaryUnzipBuffer);
|
||||
|
||||
taosCleanUpIntHash(pRuntimeEnv->hashList);
|
||||
|
|
Loading…
Reference in New Issue