[vnode] Fix a few error handling as well as memory leak
1. Fix error handling in vnodeProcessQueryMsg() where qCreateQueryInfo() fails. 2. Inside qCreateQueryInfo(), cleanup pQInfo upon failure, also add a missing goto statement.
This commit is contained in:
parent
5f132980e8
commit
f38f691796
|
@ -232,7 +232,7 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *
|
||||||
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
|
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
|
||||||
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
||||||
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
|
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
|
||||||
|
|
||||||
char *pElem = pFilterInfo->pData + pFilterInfo->info.bytes * elemPos;
|
char *pElem = pFilterInfo->pData + pFilterInfo->info.bytes * elemPos;
|
||||||
if (isNull(pElem, pFilterInfo->info.type)) {
|
if (isNull(pElem, pFilterInfo->info.type)) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -241,7 +241,7 @@ bool doFilterData(SQuery *pQuery, int32_t elemPos) {
|
||||||
bool qualified = false;
|
bool qualified = false;
|
||||||
for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) {
|
for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) {
|
||||||
SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j];
|
SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j];
|
||||||
|
|
||||||
if (pFilterElem->fp(pFilterElem, pElem, pElem)) {
|
if (pFilterElem->fp(pFilterElem, pElem, pElem)) {
|
||||||
qualified = true;
|
qualified = true;
|
||||||
break;
|
break;
|
||||||
|
@ -824,7 +824,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
|
||||||
SArray *pDataBlock) {
|
SArray *pDataBlock) {
|
||||||
char *dataBlock = NULL;
|
char *dataBlock = NULL;
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||||
|
|
||||||
int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
|
int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
|
||||||
|
@ -837,18 +837,18 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
|
||||||
} else {
|
} else {
|
||||||
pCtx->startOffset = pQuery->pos - (size - 1);
|
pCtx->startOffset = pQuery->pos - (size - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
sas->offset = 0;
|
sas->offset = 0;
|
||||||
sas->colList = pQuery->colList;
|
sas->colList = pQuery->colList;
|
||||||
sas->numOfCols = pQuery->numOfCols;
|
sas->numOfCols = pQuery->numOfCols;
|
||||||
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
|
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
|
||||||
|
|
||||||
// here the pQuery->colList and sas->colList are identical
|
// here the pQuery->colList and sas->colList are identical
|
||||||
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||||
SColumnInfo *pColMsg = &pQuery->colList[i];
|
SColumnInfo *pColMsg = &pQuery->colList[i];
|
||||||
|
|
||||||
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
||||||
|
|
||||||
dataBlock = NULL;
|
dataBlock = NULL;
|
||||||
for (int32_t k = 0; k < numOfCols; ++k) { //todo refactor
|
for (int32_t k = 0; k < numOfCols; ++k) { //todo refactor
|
||||||
SColumnInfoData *p = taosArrayGet(pDataBlock, k);
|
SColumnInfoData *p = taosArrayGet(pDataBlock, k);
|
||||||
|
@ -857,7 +857,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(dataBlock != NULL);
|
assert(dataBlock != NULL);
|
||||||
sas->data[i] = dataBlock + pCtx->startOffset * pQuery->colList[i].bytes; // start from the offset
|
sas->data[i] = dataBlock + pCtx->startOffset * pQuery->colList[i].bytes; // start from the offset
|
||||||
}
|
}
|
||||||
|
@ -873,7 +873,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
|
||||||
* column in cache with the corresponding meter schema is reinforced.
|
* column in cache with the corresponding meter schema is reinforced.
|
||||||
*/
|
*/
|
||||||
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
|
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
|
||||||
if (pCol->colId == p->info.colId) {
|
if (pCol->colId == p->info.colId) {
|
||||||
|
@ -912,7 +912,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
||||||
}
|
}
|
||||||
|
|
||||||
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
||||||
|
|
||||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||||
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
|
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
|
||||||
|
|
||||||
|
@ -982,10 +982,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
||||||
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
|
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(sasArray[i].data);
|
tfree(sasArray[i].data);
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(sasArray);
|
tfree(sasArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1045,7 +1045,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
|
||||||
* column in cache with the corresponding meter schema is reinforced.
|
* column in cache with the corresponding meter schema is reinforced.
|
||||||
*/
|
*/
|
||||||
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
|
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
|
||||||
if (pColIndex->colId == p->info.colId) {
|
if (pColIndex->colId == p->info.colId) {
|
||||||
|
@ -1053,7 +1053,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1113,7 +1113,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
||||||
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
||||||
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
|
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
|
||||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||||
|
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData;
|
TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData;
|
||||||
|
|
||||||
|
@ -1253,18 +1253,18 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->lastKey = lastKey + step;
|
pQuery->lastKey = lastKey + step;
|
||||||
|
|
||||||
// todo refactor: extract method
|
// todo refactor: extract method
|
||||||
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
|
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(sasArray[i].data);
|
tfree(sasArray[i].data);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(sasArray);
|
free(sasArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1423,7 +1423,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
||||||
|
|
||||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||||
SColIndex* pIndex = &pSqlFuncMsg->colInfo;
|
SColIndex* pIndex = &pSqlFuncMsg->colInfo;
|
||||||
|
|
||||||
int32_t index = pSqlFuncMsg->colInfo.colIndex;
|
int32_t index = pSqlFuncMsg->colInfo.colIndex;
|
||||||
if (TSDB_COL_IS_TAG(pIndex->flag)) {
|
if (TSDB_COL_IS_TAG(pIndex->flag)) {
|
||||||
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) {
|
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
|
@ -1437,7 +1437,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
||||||
pCtx->inputBytes = pQuery->colList[index].bytes;
|
pCtx->inputBytes = pQuery->colList[index].bytes;
|
||||||
pCtx->inputType = pQuery->colList[index].type;
|
pCtx->inputType = pQuery->colList[index].type;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCtx->ptsOutputBuf = NULL;
|
pCtx->ptsOutputBuf = NULL;
|
||||||
|
|
||||||
pCtx->outputBytes = pQuery->pSelectExpr[i].bytes;
|
pCtx->outputBytes = pQuery->pSelectExpr[i].bytes;
|
||||||
|
@ -1502,7 +1502,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
|
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
|
||||||
|
|
||||||
qTrace("QInfo:%p teardown runtime env", pQInfo);
|
qTrace("QInfo:%p teardown runtime env", pQInfo);
|
||||||
cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutput);
|
cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutput);
|
||||||
|
|
||||||
|
@ -1552,7 +1552,7 @@ static bool isQueryKilled(SQInfo *pQInfo) {
|
||||||
pQInfo->killed = 1;
|
pQInfo->killed = 1;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (pQInfo->killed == 1);
|
return (pQInfo->killed == 1);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -1657,7 +1657,7 @@ static bool onlyQueryTags(SQuery* pQuery) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1703,7 +1703,7 @@ static UNUSED_FUNC bool doGetQueryPos(TSKEY key, SQInfo *pQInfo, SPointInterpoSu
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
SMeterObj * pMeterObj = pRuntimeEnv->pTabObj;
|
SMeterObj * pMeterObj = pRuntimeEnv->pTabObj;
|
||||||
|
|
||||||
/* key in query range. If not, no qualified in disk file */
|
/* key in query range. If not, no qualified in disk file */
|
||||||
if (key != -1 && key <= pQuery->window.ekey) {
|
if (key != -1 && key <= pQuery->window.ekey) {
|
||||||
if (isPointInterpoQuery(pQuery)) { /* no qualified data in this query range */
|
if (isPointInterpoQuery(pQuery)) { /* no qualified data in this query range */
|
||||||
|
@ -2164,10 +2164,10 @@ static UNUSED_FUNC void allocMemForInterpo(SQInfo *pQInfo, SQuery *pQuery, void
|
||||||
#if 0
|
#if 0
|
||||||
if (pQuery->interpoType != TSDB_INTERPO_NONE) {
|
if (pQuery->interpoType != TSDB_INTERPO_NONE) {
|
||||||
assert(isIntervalQuery(pQuery) || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery)));
|
assert(isIntervalQuery(pQuery) || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery)));
|
||||||
|
|
||||||
if (isIntervalQuery(pQuery)) {
|
if (isIntervalQuery(pQuery)) {
|
||||||
pQInfo->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutput);
|
pQInfo->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutput);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
pQInfo->runtimeEnv.pInterpoBuf[i] =
|
pQInfo->runtimeEnv.pInterpoBuf[i] =
|
||||||
calloc(1, sizeof(tFilePage) + pQuery->pSelectExpr[i].bytes * pMeterObj->pointsPerFileBlock);
|
calloc(1, sizeof(tFilePage) + pQuery->pSelectExpr[i].bytes * pMeterObj->pointsPerFileBlock);
|
||||||
|
@ -2243,14 +2243,14 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
|
||||||
for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
|
for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
|
||||||
SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
|
SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
|
||||||
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
|
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
|
||||||
|
|
||||||
if (pMeter->numOfQueries > 0) {
|
if (pMeter->numOfQueries > 0) {
|
||||||
qTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid,
|
qTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid,
|
||||||
pMeter->meterId, pMeter->numOfQueries);
|
pMeter->meterId, pMeter->numOfQueries);
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* in order to reduce log output, for all meters of which numOfQueries count are 0,
|
* in order to reduce log output, for all meters of which numOfQueries count are 0,
|
||||||
* we do not output corresponding information
|
* we do not output corresponding information
|
||||||
|
@ -2272,26 +2272,26 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
|
||||||
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
||||||
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
|
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
|
||||||
int32_t colIndex = pFilterInfo->info.colIndex;
|
int32_t colIndex = pFilterInfo->info.colIndex;
|
||||||
|
|
||||||
// this column not valid in current data block
|
// this column not valid in current data block
|
||||||
if (colIndex < 0 || pDataStatis[colIndex].colId != pFilterInfo->info.data.colId) {
|
if (colIndex < 0 || pDataStatis[colIndex].colId != pFilterInfo->info.data.colId) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// not support pre-filter operation on binary/nchar data type
|
// not support pre-filter operation on binary/nchar data type
|
||||||
if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) {
|
if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// all points in current column are NULL, no need to check its boundary value
|
// all points in current column are NULL, no need to check its boundary value
|
||||||
if (pDataStatis[colIndex].numOfNull == numOfTotalPoints) {
|
if (pDataStatis[colIndex].numOfNull == numOfTotalPoints) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pFilterInfo->info.info.type == TSDB_DATA_TYPE_FLOAT) {
|
if (pFilterInfo->info.info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
float minval = *(double *)(&pDataStatis[colIndex].min);
|
float minval = *(double *)(&pDataStatis[colIndex].min);
|
||||||
float maxval = *(double *)(&pDataStatis[colIndex].max);
|
float maxval = *(double *)(&pDataStatis[colIndex].max);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) {
|
for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) {
|
||||||
if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&minval, (char *)&maxval)) {
|
if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&minval, (char *)&maxval)) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -2306,7 +2306,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo disable this opt code block temporarily
|
// todo disable this opt code block temporarily
|
||||||
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
// int32_t functId = pQuery->pSelectExpr[i].base.functionId;
|
// int32_t functId = pQuery->pSelectExpr[i].base.functionId;
|
||||||
|
@ -2561,9 +2561,9 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI
|
||||||
} else {
|
} else {
|
||||||
tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val);
|
tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val);
|
||||||
}
|
}
|
||||||
|
|
||||||
tVariantCreateFromBinary(param, val, bytes, type);
|
tVariantCreateFromBinary(param, val, bytes, type);
|
||||||
|
|
||||||
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
|
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
tfree(val);
|
tfree(val);
|
||||||
}
|
}
|
||||||
|
@ -2711,17 +2711,17 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
|
||||||
#if 0
|
#if 0
|
||||||
int32_t numOfCols = pQuery->numOfOutput;
|
int32_t numOfCols = pQuery->numOfOutput;
|
||||||
printf("super table query intermediate result, total:%d\n", numOfRows);
|
printf("super table query intermediate result, total:%d\n", numOfRows);
|
||||||
|
|
||||||
SQInfo * pQInfo = (SQInfo *)(GET_QINFO_ADDR(pQuery));
|
SQInfo * pQInfo = (SQInfo *)(GET_QINFO_ADDR(pQuery));
|
||||||
SMeterObj *pMeterObj = pQInfo->pObj;
|
SMeterObj *pMeterObj = pQInfo->pObj;
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
for (int32_t j = 0; j < numOfRows; ++j) {
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
switch (pQuery->pSelectExpr[i].type) {
|
switch (pQuery->pSelectExpr[i].type) {
|
||||||
case TSDB_DATA_TYPE_BINARY: {
|
case TSDB_DATA_TYPE_BINARY: {
|
||||||
int32_t colIndex = pQuery->pSelectExpr[i].base.colInfo.colIndex;
|
int32_t colIndex = pQuery->pSelectExpr[i].base.colInfo.colIndex;
|
||||||
int32_t type = 0;
|
int32_t type = 0;
|
||||||
|
|
||||||
if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].base.colInfo.flag)) {
|
if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].base.colInfo.flag)) {
|
||||||
type = pQuery->pSelectExpr[i].type;
|
type = pQuery->pSelectExpr[i].type;
|
||||||
} else {
|
} else {
|
||||||
|
@ -3261,7 +3261,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
||||||
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
||||||
|
|
||||||
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes);
|
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes);
|
||||||
pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip;
|
pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip;
|
||||||
|
|
||||||
|
@ -3847,7 +3847,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
||||||
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
|
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
|
||||||
|
|
||||||
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
|
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
|
||||||
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
||||||
} else {
|
} else {
|
||||||
|
@ -3903,31 +3903,31 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t
|
||||||
#if 0
|
#if 0
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery *pQuery = &pRuntimeEnv->pQuery;
|
SQuery *pQuery = &pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
assert(pRuntimeEnv->pCtx[0].outputBytes == TSDB_KEYSIZE);
|
assert(pRuntimeEnv->pCtx[0].outputBytes == TSDB_KEYSIZE);
|
||||||
|
|
||||||
// build support structure for performing interpolation
|
// build support structure for performing interpolation
|
||||||
SSchema *pSchema = calloc(1, sizeof(SSchema) * pQuery->numOfOutput);
|
SSchema *pSchema = calloc(1, sizeof(SSchema) * pQuery->numOfOutput);
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
pSchema[i].bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
pSchema[i].bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
||||||
pSchema[i].type = pQuery->pSelectExpr[i].type;
|
pSchema[i].type = pQuery->pSelectExpr[i].type;
|
||||||
}
|
}
|
||||||
|
|
||||||
// SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutput, pQuery->pointsToRead);
|
// SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutput, pQuery->pointsToRead);
|
||||||
|
|
||||||
char * srcData[TSDB_MAX_COLUMNS] = {0};
|
char * srcData[TSDB_MAX_COLUMNS] = {0};
|
||||||
int32_t functions[TSDB_MAX_COLUMNS] = {0};
|
int32_t functions[TSDB_MAX_COLUMNS] = {0};
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
srcData[i] = pDataSrc[i]->data;
|
srcData[i] = pDataSrc[i]->data;
|
||||||
functions[i] = pQuery->pSelectExpr[i].base.functionId;
|
functions[i] = pQuery->pSelectExpr[i].base.functionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(0);
|
assert(0);
|
||||||
// int32_t numOfRes = taosDoInterpoResult(&pRuntimeEnv->interpoInfo, pQuery->interpoType, data, numOfRows, outputRows,
|
// int32_t numOfRes = taosDoInterpoResult(&pRuntimeEnv->interpoInfo, pQuery->interpoType, data, numOfRows, outputRows,
|
||||||
// pQuery->intervalTime, (int64_t *)pDataSrc[0]->data, pModel, srcData,
|
// pQuery->intervalTime, (int64_t *)pDataSrc[0]->data, pModel, srcData,
|
||||||
// pQuery->defaultVal, functions, pRuntimeEnv->pTabObj->pointsPerFileBlock);
|
// pQuery->defaultVal, functions, pRuntimeEnv->pTabObj->pointsPerFileBlock);
|
||||||
|
|
||||||
destroyColumnModel(pModel);
|
destroyColumnModel(pModel);
|
||||||
free(pSchema);
|
free(pSchema);
|
||||||
#endif
|
#endif
|
||||||
|
@ -3956,20 +3956,20 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
|
||||||
#if 0
|
#if 0
|
||||||
while (1) {
|
while (1) {
|
||||||
numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo);
|
numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo);
|
||||||
|
|
||||||
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.skey, pQuery->order.order, pQuery->intervalTime,
|
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.skey, pQuery->order.order, pQuery->intervalTime,
|
||||||
pQuery->slidingTimeUnit, pQuery->precision);
|
pQuery->slidingTimeUnit, pQuery->precision);
|
||||||
int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data,
|
int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data,
|
||||||
numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead);
|
numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead);
|
||||||
|
|
||||||
int32_t ret = resultInterpolate(pQInfo, pDst, pDataSrc, numOfRows, numOfFinalRows);
|
int32_t ret = resultInterpolate(pQInfo, pDst, pDataSrc, numOfRows, numOfFinalRows);
|
||||||
assert(ret == numOfFinalRows);
|
assert(ret == numOfFinalRows);
|
||||||
|
|
||||||
/* reached the start position of according to offset value, return immediately */
|
/* reached the start position of according to offset value, return immediately */
|
||||||
if (pQuery->limit.offset == 0) {
|
if (pQuery->limit.offset == 0) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQuery->limit.offset < ret) {
|
if (pQuery->limit.offset < ret) {
|
||||||
ret -= pQuery->limit.offset;
|
ret -= pQuery->limit.offset;
|
||||||
// todo !!!!there exactly number of interpo is not valid.
|
// todo !!!!there exactly number of interpo is not valid.
|
||||||
|
@ -3984,7 +3984,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
|
||||||
pQuery->limit.offset -= ret;
|
pQuery->limit.offset -= ret;
|
||||||
ret = 0;
|
ret = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!vnodeHasRemainResults(pQInfo)) {
|
if (!vnodeHasRemainResults(pQInfo)) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -4006,31 +4006,31 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) {
|
||||||
} else {
|
} else {
|
||||||
pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf);
|
pSummary->tmpBufferInDisk = getResBufSize(pRuntimeEnv->pResultBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("QInfo:%p statis: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo,
|
qTrace("QInfo:%p statis: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo,
|
||||||
pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0);
|
pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0);
|
||||||
|
|
||||||
qTrace("QInfo:%p statis: field info: %d, size:%d Bytes, avg size:%.2f Bytes, elapsed time:%.2f ms", pQInfo,
|
qTrace("QInfo:%p statis: field info: %d, size:%d Bytes, avg size:%.2f Bytes, elapsed time:%.2f ms", pQInfo,
|
||||||
pSummary->readField, pSummary->totalFieldSize, (double)pSummary->totalFieldSize / pSummary->readField,
|
pSummary->readField, pSummary->totalFieldSize, (double)pSummary->totalFieldSize / pSummary->readField,
|
||||||
pSummary->loadFieldUs / 1000.0);
|
pSummary->loadFieldUs / 1000.0);
|
||||||
|
|
||||||
qTrace(
|
qTrace(
|
||||||
"QInfo:%p statis: file blocks:%d, size:%d Bytes, elapsed time:%.2f ms, skipped:%d, in-memory gen null:%d Bytes",
|
"QInfo:%p statis: file blocks:%d, size:%d Bytes, elapsed time:%.2f ms, skipped:%d, in-memory gen null:%d Bytes",
|
||||||
pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0,
|
pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0,
|
||||||
pSummary->skippedFileBlocks, pSummary->totalGenData);
|
pSummary->skippedFileBlocks, pSummary->totalGenData);
|
||||||
|
|
||||||
qTrace("QInfo:%p statis: cache blocks:%d", pQInfo, pSummary->blocksInCache, 0);
|
qTrace("QInfo:%p statis: cache blocks:%d", pQInfo, pSummary->blocksInCache, 0);
|
||||||
qTrace("QInfo:%p statis: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk);
|
qTrace("QInfo:%p statis: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk);
|
||||||
|
|
||||||
qTrace("QInfo:%p statis: file:%d, table:%d", pQInfo, pSummary->numOfFiles, pSummary->numOfTables);
|
qTrace("QInfo:%p statis: file:%d, table:%d", pQInfo, pSummary->numOfFiles, pSummary->numOfTables);
|
||||||
qTrace("QInfo:%p statis: seek ops:%d", pQInfo, pSummary->numOfSeek);
|
qTrace("QInfo:%p statis: seek ops:%d", pQInfo, pSummary->numOfSeek);
|
||||||
|
|
||||||
double total = pSummary->fileTimeUs + pSummary->cacheTimeUs;
|
double total = pSummary->fileTimeUs + pSummary->cacheTimeUs;
|
||||||
double io = pSummary->loadCompInfoUs + pSummary->loadBlocksUs + pSummary->loadFieldUs;
|
double io = pSummary->loadCompInfoUs + pSummary->loadBlocksUs + pSummary->loadFieldUs;
|
||||||
|
|
||||||
// todo add the intermediate result save cost!!
|
// todo add the intermediate result save cost!!
|
||||||
double computing = total - io;
|
double computing = total - io;
|
||||||
|
|
||||||
qTrace(
|
qTrace(
|
||||||
"QInfo:%p statis: total elapsed time:%.2f ms, file:%.2f ms(%.2f%), cache:%.2f ms(%.2f%). io:%.2f ms(%.2f%),"
|
"QInfo:%p statis: total elapsed time:%.2f ms, file:%.2f ms(%.2f%), cache:%.2f ms(%.2f%). io:%.2f ms(%.2f%),"
|
||||||
"comput:%.2fms(%.2f%)",
|
"comput:%.2fms(%.2f%)",
|
||||||
|
@ -4230,15 +4230,15 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
|
||||||
.colList = pQuery->colList,
|
.colList = pQuery->colList,
|
||||||
.numOfCols = pQuery->numOfCols,
|
.numOfCols = pQuery->numOfCols,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
// normal query setup the queryhandle here
|
// normal query setup the queryhandle here
|
||||||
if (isFirstLastRowQuery(pQuery) && !isSTableQuery) { // in case of last_row query, invoke a different API.
|
if (isFirstLastRowQuery(pQuery) && !isSTableQuery) { // in case of last_row query, invoke a different API.
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
||||||
} else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
|
} else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQInfo->tsdb = tsdb;
|
pQInfo->tsdb = tsdb;
|
||||||
pQInfo->vgId = vgId;
|
pQInfo->vgId = vgId;
|
||||||
|
|
||||||
|
@ -4388,11 +4388,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
||||||
if (pInfo->id.tid == blockInfo.tid) {
|
if (pInfo->id.tid == blockInfo.tid) {
|
||||||
assert(pInfo->id.uid == blockInfo.uid);
|
assert(pInfo->id.uid == blockInfo.uid);
|
||||||
pTableQueryInfo = item->info;
|
pTableQueryInfo = item->info;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTableQueryInfo != NULL) {
|
if (pTableQueryInfo != NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -4456,11 +4456,11 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
||||||
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
||||||
pRuntimeEnv->pQueryHandle = NULL;
|
pRuntimeEnv->pQueryHandle = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp);
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp);
|
||||||
taosArrayDestroy(tx);
|
taosArrayDestroy(tx);
|
||||||
taosArrayDestroy(g1);
|
taosArrayDestroy(g1);
|
||||||
|
|
||||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||||
if (pRuntimeEnv->cur.vgroupIndex == -1) {
|
if (pRuntimeEnv->cur.vgroupIndex == -1) {
|
||||||
int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key;
|
int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key;
|
||||||
|
@ -4538,41 +4538,41 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
if (isFirstLastRowQuery(pQuery)) {
|
if (isFirstLastRowQuery(pQuery)) {
|
||||||
qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex,
|
qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex,
|
||||||
numOfGroups);
|
numOfGroups);
|
||||||
|
|
||||||
STsdbQueryCond cond = {
|
STsdbQueryCond cond = {
|
||||||
.twindow = pQuery->window,
|
.twindow = pQuery->window,
|
||||||
.colList = pQuery->colList,
|
.colList = pQuery->colList,
|
||||||
.order = pQuery->order.order,
|
.order = pQuery->order.order,
|
||||||
.numOfCols = pQuery->numOfCols,
|
.numOfCols = pQuery->numOfCols,
|
||||||
};
|
};
|
||||||
|
|
||||||
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
|
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
|
||||||
SArray *tx = taosArrayClone(group);
|
SArray *tx = taosArrayClone(group);
|
||||||
taosArrayPush(g1, &tx);
|
taosArrayPush(g1, &tx);
|
||||||
|
|
||||||
STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};
|
STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};
|
||||||
|
|
||||||
// include only current table
|
// include only current table
|
||||||
if (pRuntimeEnv->pQueryHandle != NULL) {
|
if (pRuntimeEnv->pQueryHandle != NULL) {
|
||||||
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
||||||
pRuntimeEnv->pQueryHandle = NULL;
|
pRuntimeEnv->pQueryHandle = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp);
|
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp);
|
||||||
|
|
||||||
initCtxOutputBuf(pRuntimeEnv);
|
initCtxOutputBuf(pRuntimeEnv);
|
||||||
setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb);
|
setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb);
|
||||||
scanAllDataBlocks(pRuntimeEnv);
|
scanAllDataBlocks(pRuntimeEnv);
|
||||||
|
|
||||||
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
|
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
|
||||||
if (numOfRes > 0) {
|
if (numOfRes > 0) {
|
||||||
pQuery->rec.rows += numOfRes;
|
pQuery->rec.rows += numOfRes;
|
||||||
forwardCtxOutputBuf(pRuntimeEnv, numOfRes);
|
forwardCtxOutputBuf(pRuntimeEnv, numOfRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
skipResults(pRuntimeEnv);
|
skipResults(pRuntimeEnv);
|
||||||
pQInfo->groupIndex += 1;
|
pQInfo->groupIndex += 1;
|
||||||
|
|
||||||
// enable execution for next table, when handling the projection query
|
// enable execution for next table, when handling the projection query
|
||||||
enableExecutionForNextTable(pRuntimeEnv);
|
enableExecutionForNextTable(pRuntimeEnv);
|
||||||
}
|
}
|
||||||
|
@ -4612,7 +4612,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex);
|
SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex);
|
||||||
|
|
||||||
STableQueryInfo *pInfo = item->info;
|
STableQueryInfo *pInfo = item->info;
|
||||||
if (pInfo->lastKey > 0) {
|
if (pInfo->lastKey > 0) {
|
||||||
pQuery->window.skey = pInfo->lastKey;
|
pQuery->window.skey = pInfo->lastKey;
|
||||||
|
@ -4750,7 +4750,7 @@ static void createTableQueryInfo(SQInfo *pQInfo) {
|
||||||
STableQueryInfo* pInfo = createTableQueryInfoImpl(&pQInfo->runtimeEnv, item->id, pQuery->window);
|
STableQueryInfo* pInfo = createTableQueryInfoImpl(&pQInfo->runtimeEnv, item->id, pQuery->window);
|
||||||
pInfo->groupIdx = i;
|
pInfo->groupIdx = i;
|
||||||
pInfo->tableIndex = index;
|
pInfo->tableIndex = index;
|
||||||
|
|
||||||
item->info = pInfo;
|
item->info = pInfo;
|
||||||
index += 1;
|
index += 1;
|
||||||
}
|
}
|
||||||
|
@ -5179,22 +5179,22 @@ static void stableQueryImpl(SQInfo *pQInfo) {
|
||||||
|
|
||||||
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
|
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
|
|
||||||
if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) {
|
if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) {
|
||||||
while(j < pQueryMsg->numOfTags) {
|
while(j < pQueryMsg->numOfTags) {
|
||||||
if (pExprMsg->colInfo.colId == pTagCols[j].colId) {
|
if (pExprMsg->colInfo.colId == pTagCols[j].colId) {
|
||||||
return j;
|
return j;
|
||||||
}
|
}
|
||||||
|
|
||||||
j += 1;
|
j += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
while (j < pQueryMsg->numOfCols) {
|
while (j < pQueryMsg->numOfCols) {
|
||||||
if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) {
|
if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) {
|
||||||
return j;
|
return j;
|
||||||
}
|
}
|
||||||
|
|
||||||
j += 1;
|
j += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5243,7 +5243,7 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5395,10 +5395,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
||||||
|
|
||||||
pExprMsg = (SSqlFuncMsg *)pMsg;
|
pExprMsg = (SSqlFuncMsg *)pMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!validateQuerySourceCols(pQueryMsg, *pExpr)) {
|
if (!validateQuerySourceCols(pQueryMsg, *pExpr)) {
|
||||||
tfree(*pExpr);
|
tfree(*pExpr);
|
||||||
|
|
||||||
return TSDB_CODE_INVALID_QUERY_MSG;
|
return TSDB_CODE_INVALID_QUERY_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5441,12 +5441,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
||||||
(*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags);
|
(*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags);
|
||||||
for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) {
|
for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) {
|
||||||
SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
|
SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
|
||||||
|
|
||||||
pTagCol->colId = htons(pTagCol->colId);
|
pTagCol->colId = htons(pTagCol->colId);
|
||||||
pTagCol->bytes = htons(pTagCol->bytes);
|
pTagCol->bytes = htons(pTagCol->bytes);
|
||||||
pTagCol->type = htons(pTagCol->type);
|
pTagCol->type = htons(pTagCol->type);
|
||||||
pTagCol->numOfFilters = 0;
|
pTagCol->numOfFilters = 0;
|
||||||
|
|
||||||
(*tagCols)[i] = *pTagCol;
|
(*tagCols)[i] = *pTagCol;
|
||||||
pMsg += sizeof(SColumnInfo);
|
pMsg += sizeof(SColumnInfo);
|
||||||
}
|
}
|
||||||
|
@ -5458,14 +5458,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
||||||
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
|
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
|
||||||
pMsg += pQueryMsg->tagCondLen;
|
pMsg += pQueryMsg->tagCondLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*pMsg != 0) {
|
if (*pMsg != 0) {
|
||||||
size_t len = strlen(pMsg) + 1;
|
size_t len = strlen(pMsg) + 1;
|
||||||
*tbnameCond = malloc(len);
|
*tbnameCond = malloc(len);
|
||||||
strcpy(*tbnameCond, pMsg);
|
strcpy(*tbnameCond, pMsg);
|
||||||
pMsg += len;
|
pMsg += len;
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("qmsg:%p query %d tables, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
|
qTrace("qmsg:%p query %d tables, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
|
||||||
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
|
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
|
||||||
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
|
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
|
||||||
|
@ -5490,7 +5490,7 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable
|
||||||
qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz);
|
qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz);
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
pArithExprInfo->pExpr = pExprNode;
|
pArithExprInfo->pExpr = pExprNode;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -5557,7 +5557,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
|
||||||
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
|
||||||
pExprs[i].base = *pExprMsg[i];
|
pExprs[i].base = *pExprMsg[i];
|
||||||
int16_t functId = pExprs[i].base.functionId;
|
int16_t functId = pExprs[i].base.functionId;
|
||||||
|
|
||||||
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
|
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
|
||||||
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
|
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
|
||||||
assert(j < pQueryMsg->numOfCols);
|
assert(j < pQueryMsg->numOfCols);
|
||||||
|
@ -5597,7 +5597,7 @@ static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SCol
|
||||||
for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
|
for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
|
||||||
taosArrayPush(pGroupbyExpr->columnInfo, &pColIndex[i]);
|
taosArrayPush(pGroupbyExpr->columnInfo, &pColIndex[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pGroupbyExpr;
|
return pGroupbyExpr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5620,7 +5620,7 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
|
||||||
|
|
||||||
memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData));
|
memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData));
|
||||||
pFilterInfo->info = pQuery->colList[i];
|
pFilterInfo->info = pQuery->colList[i];
|
||||||
|
|
||||||
pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters;
|
pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters;
|
||||||
pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem));
|
pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem));
|
||||||
|
|
||||||
|
@ -5753,9 +5753,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
||||||
SColumnInfo *pColInfo = &pQuery->colList[i];
|
SColumnInfo *pColInfo = &pQuery->colList[i];
|
||||||
pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters);
|
pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->tagColList = pTagCols;
|
pQuery->tagColList = pTagCols;
|
||||||
|
|
||||||
// calculate the result row size
|
// calculate the result row size
|
||||||
for (int16_t col = 0; col < numOfOutput; ++col) {
|
for (int16_t col = 0; col < numOfOutput; ++col) {
|
||||||
assert(pExprs[col].bytes > 0);
|
assert(pExprs[col].bytes > 0);
|
||||||
|
@ -5802,24 +5802,24 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
||||||
|
|
||||||
// to make sure third party won't overwrite this structure
|
// to make sure third party won't overwrite this structure
|
||||||
pQInfo->signature = pQInfo;
|
pQInfo->signature = pQInfo;
|
||||||
|
|
||||||
pQInfo->tableIdGroupInfo = *groupInfo;
|
pQInfo->tableIdGroupInfo = *groupInfo;
|
||||||
size_t numOfGroups = taosArrayGetSize(groupInfo->pGroupList);
|
size_t numOfGroups = taosArrayGetSize(groupInfo->pGroupList);
|
||||||
|
|
||||||
pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
|
pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
|
||||||
pQInfo->groupInfo.numOfTables = groupInfo->numOfTables;
|
pQInfo->groupInfo.numOfTables = groupInfo->numOfTables;
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
SArray* pa = taosArrayGetP(groupInfo->pGroupList, i);
|
SArray* pa = taosArrayGetP(groupInfo->pGroupList, i);
|
||||||
size_t s = taosArrayGetSize(pa);
|
size_t s = taosArrayGetSize(pa);
|
||||||
|
|
||||||
SArray* p1 = taosArrayInit(s, sizeof(SGroupItem));
|
SArray* p1 = taosArrayInit(s, sizeof(SGroupItem));
|
||||||
|
|
||||||
for(int32_t j = 0; j < s; ++j) {
|
for(int32_t j = 0; j < s; ++j) {
|
||||||
SGroupItem item = { .id = *(STableId*) taosArrayGet(pa, j), .info = NULL, };
|
SGroupItem item = { .id = *(STableId*) taosArrayGet(pa, j), .info = NULL, };
|
||||||
taosArrayPush(p1, &item);
|
taosArrayPush(p1, &item);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pQInfo->groupInfo.pGroupList, &p1);
|
taosArrayPush(pQInfo->groupInfo.pGroupList, &p1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5957,7 +5957,7 @@ static void freeQInfo(SQInfo *pQInfo) {
|
||||||
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
||||||
for (int32_t i = 0; i < numOfGroups; ++i) {
|
for (int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
|
SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(p);
|
size_t num = taosArrayGetSize(p);
|
||||||
for(int32_t j = 0; j < num; ++j) {
|
for(int32_t j = 0; j < num; ++j) {
|
||||||
SGroupItem* item = taosArrayGet(p, j);
|
SGroupItem* item = taosArrayGet(p, j);
|
||||||
|
@ -5965,31 +5965,31 @@ static void freeQInfo(SQInfo *pQInfo) {
|
||||||
destroyTableQueryInfo(item->info, pQuery->numOfOutput);
|
destroyTableQueryInfo(item->info, pQuery->numOfOutput);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(p);
|
taosArrayDestroy(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pQInfo->groupInfo.pGroupList);
|
taosArrayDestroy(pQInfo->groupInfo.pGroupList);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
SArray* p = taosArrayGetP(pQInfo->tableIdGroupInfo.pGroupList, i);
|
SArray* p = taosArrayGetP(pQInfo->tableIdGroupInfo.pGroupList, i);
|
||||||
taosArrayDestroy(p);
|
taosArrayDestroy(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList);
|
taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList);
|
||||||
|
|
||||||
if (pQuery->pGroupbyExpr != NULL) {
|
if (pQuery->pGroupbyExpr != NULL) {
|
||||||
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
|
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
|
||||||
tfree(pQuery->pGroupbyExpr);
|
tfree(pQuery->pGroupbyExpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pQuery->tagColList);
|
tfree(pQuery->tagColList);
|
||||||
tfree(pQuery->pFilterInfo);
|
tfree(pQuery->pFilterInfo);
|
||||||
tfree(pQuery->colList);
|
tfree(pQuery->colList);
|
||||||
tfree(pQuery->sdata);
|
tfree(pQuery->sdata);
|
||||||
|
|
||||||
tfree(pQuery);
|
tfree(pQuery);
|
||||||
|
|
||||||
qTrace("QInfo:%p QInfo is freed", pQInfo);
|
qTrace("QInfo:%p QInfo is freed", pQInfo);
|
||||||
|
|
||||||
// destroy signature, in order to avoid the query process pass the object safety check
|
// destroy signature, in order to avoid the query process pass the object safety check
|
||||||
|
@ -6042,7 +6042,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
||||||
qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo,
|
qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo,
|
||||||
pQuery->sdata[0]->data, strerror(errno));
|
pQuery->sdata[0]->data, strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
// all data returned, set query over
|
// all data returned, set query over
|
||||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||||
setQueryStatus(pQuery, QUERY_OVER);
|
setQueryStatus(pQuery, QUERY_OVER);
|
||||||
|
@ -6099,10 +6099,10 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
||||||
|
|
||||||
bool isSTableQuery = false;
|
bool isSTableQuery = false;
|
||||||
STableGroupInfo groupInfo = {0};
|
STableGroupInfo groupInfo = {0};
|
||||||
|
|
||||||
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) {
|
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) {
|
||||||
isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
|
isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
|
||||||
|
|
||||||
STableId *id = taosArrayGet(pTableIdList, 0);
|
STableId *id = taosArrayGet(pTableIdList, 0);
|
||||||
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
|
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
|
||||||
goto _query_over;
|
goto _query_over;
|
||||||
|
@ -6110,13 +6110,13 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
||||||
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) {
|
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) {
|
||||||
isSTableQuery = true;
|
isSTableQuery = true;
|
||||||
STableId *id = taosArrayGet(pTableIdList, 0);
|
STableId *id = taosArrayGet(pTableIdList, 0);
|
||||||
|
|
||||||
// group by normal column, do not pass the group by condition to tsdb to group table into different group
|
// group by normal column, do not pass the group by condition to tsdb to group table into different group
|
||||||
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
|
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
|
||||||
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) {
|
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) {
|
||||||
numOfGroupByCols = 0;
|
numOfGroupByCols = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle the error
|
// todo handle the error
|
||||||
/*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
|
/*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
|
||||||
numOfGroupByCols);
|
numOfGroupByCols);
|
||||||
|
@ -6131,6 +6131,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
||||||
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo);
|
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo);
|
||||||
if ((*pQInfo) == NULL) {
|
if ((*pQInfo) == NULL) {
|
||||||
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
|
goto _query_over;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery);
|
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery);
|
||||||
|
@ -6139,7 +6140,12 @@ _query_over:
|
||||||
tfree(tagCond);
|
tfree(tagCond);
|
||||||
tfree(tbnameCond);
|
tfree(tbnameCond);
|
||||||
taosArrayDestroy(pTableIdList);
|
taosArrayDestroy(pTableIdList);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tfree(*pQInfo);
|
||||||
|
*pQInfo = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// if failed to add ref for all meters in this query, abort current query
|
// if failed to add ref for all meters in this query, abort current query
|
||||||
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
|
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
|
||||||
return code;
|
return code;
|
||||||
|
@ -6164,7 +6170,7 @@ void qTableQuery(qinfo_t qinfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("QInfo:%p query task is launched", pQInfo);
|
qTrace("QInfo:%p query task is launched", pQInfo);
|
||||||
|
|
||||||
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
||||||
buildTagQueryResult(pQInfo); // todo support the limit/offset
|
buildTagQueryResult(pQInfo); // todo support the limit/offset
|
||||||
} else if (pQInfo->runtimeEnv.stableQuery) {
|
} else if (pQInfo->runtimeEnv.stableQuery) {
|
||||||
|
@ -6172,7 +6178,7 @@ void qTableQuery(qinfo_t qinfo) {
|
||||||
} else {
|
} else {
|
||||||
tableQueryImpl(pQInfo);
|
tableQueryImpl(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
// vnodeDecRefCount(pQInfo);
|
// vnodeDecRefCount(pQInfo);
|
||||||
}
|
}
|
||||||
|
@ -6263,61 +6269,61 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
|
||||||
static void buildTagQueryResult(SQInfo* pQInfo) {
|
static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
size_t num = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
||||||
assert(num == 1); // only one group
|
assert(num == 1); // only one group
|
||||||
|
|
||||||
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
|
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
|
||||||
num = taosArrayGetSize(pa);
|
num = taosArrayGetSize(pa);
|
||||||
|
|
||||||
assert(num == pQInfo->groupInfo.numOfTables);
|
assert(num == pQInfo->groupInfo.numOfTables);
|
||||||
int16_t type, bytes;
|
int16_t type, bytes;
|
||||||
|
|
||||||
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
|
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
|
||||||
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
|
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
|
||||||
assert(pQuery->numOfOutput == 1);
|
assert(pQuery->numOfOutput == 1);
|
||||||
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
|
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
|
||||||
|
|
||||||
int32_t rsize = pExprInfo->bytes;
|
int32_t rsize = pExprInfo->bytes;
|
||||||
char* data = NULL;
|
char* data = NULL;
|
||||||
|
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
SGroupItem* item = taosArrayGet(pa, i);
|
SGroupItem* item = taosArrayGet(pa, i);
|
||||||
|
|
||||||
char* output = pQuery->sdata[0]->data + i * rsize;
|
char* output = pQuery->sdata[0]->data + i * rsize;
|
||||||
*(int64_t*) output = item->id.uid; // memory align problem
|
*(int64_t*) output = item->id.uid; // memory align problem
|
||||||
output += sizeof(item->id.uid);
|
output += sizeof(item->id.uid);
|
||||||
|
|
||||||
*(int32_t*) output = item->id.tid;
|
*(int32_t*) output = item->id.tid;
|
||||||
output += sizeof(item->id.tid);
|
output += sizeof(item->id.tid);
|
||||||
|
|
||||||
*(int32_t*) output = pQInfo->vgId;
|
*(int32_t*) output = pQInfo->vgId;
|
||||||
output += sizeof(pQInfo->vgId);
|
output += sizeof(pQInfo->vgId);
|
||||||
|
|
||||||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data);
|
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data);
|
||||||
memcpy(output, data, bytes);
|
memcpy(output, data, bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num);
|
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num);
|
||||||
} else { // return only the tags|table name etc.
|
} else { // return only the tags|table name etc.
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
SExprInfo* pExprInfo = pQuery->pSelectExpr;
|
SExprInfo* pExprInfo = pQuery->pSelectExpr;
|
||||||
SGroupItem* item = taosArrayGet(pa, i);
|
SGroupItem* item = taosArrayGet(pa, i);
|
||||||
|
|
||||||
char* data = NULL;
|
char* data = NULL;
|
||||||
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
// todo check the return value, refactor codes
|
// todo check the return value, refactor codes
|
||||||
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
tsdbGetTableName(pQInfo->tsdb, &item->id, &data);
|
tsdbGetTableName(pQInfo->tsdb, &item->id, &data);
|
||||||
|
|
||||||
char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(dst, data, TSDB_TABLE_NAME_LEN);
|
STR_WITH_MAXSIZE_TO_VARSTR(dst, data, TSDB_TABLE_NAME_LEN);
|
||||||
tfree(data);
|
tfree(data);
|
||||||
|
|
||||||
} else {// todo refactor, return the true length of binary|nchar data
|
} else {// todo refactor, return the true length of binary|nchar data
|
||||||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data);
|
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data);
|
||||||
assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type);
|
assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type);
|
||||||
|
|
||||||
char* dst = pQuery->sdata[j]->data + i * bytes;
|
char* dst = pQuery->sdata[j]->data + i * bytes;
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
memcpy(dst, data, varDataTLen(data));
|
memcpy(dst, data, varDataTLen(data));
|
||||||
|
@ -6325,13 +6331,13 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
memcpy(dst, data, bytes);
|
memcpy(dst, data, bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, num);
|
qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, num);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->rec.rows = num;
|
pQuery->rec.rows = num;
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,11 +39,11 @@ void vnodeInitReadFp(void) {
|
||||||
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
|
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||||
|
|
||||||
if (vnodeProcessReadMsgFp[msgType] == NULL)
|
if (vnodeProcessReadMsgFp[msgType] == NULL)
|
||||||
return TSDB_CODE_MSG_NOT_PROCESSED;
|
return TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
|
||||||
if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING)
|
if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING)
|
||||||
return TSDB_CODE_NOT_ACTIVE_VNODE;
|
return TSDB_CODE_NOT_ACTIVE_VNODE;
|
||||||
|
|
||||||
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
|
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
|
||||||
}
|
}
|
||||||
|
@ -53,26 +53,29 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
|
||||||
memset(pRet, 0, sizeof(SRspRet));
|
memset(pRet, 0, sizeof(SRspRet));
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
qinfo_t pQInfo = NULL;
|
qinfo_t pQInfo = NULL;
|
||||||
if (contLen != 0) {
|
if (contLen != 0) {
|
||||||
pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
|
pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
|
||||||
|
|
||||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
||||||
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
|
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
|
||||||
pRsp->code = pRet->code;
|
pRsp->code = pRet->code;
|
||||||
|
|
||||||
pRet->len = sizeof(SQueryTableRsp);
|
pRet->len = sizeof(SQueryTableRsp);
|
||||||
pRet->rsp = pRsp;
|
pRet->rsp = pRsp;
|
||||||
|
|
||||||
dTrace("pVnode:%p vgId:%d QInfo:%p, dnode query msg disposed", pVnode, pVnode->vgId, pQInfo);
|
dTrace("pVnode:%p vgId:%d QInfo:%p, dnode query msg disposed", pVnode, pVnode->vgId, pQInfo);
|
||||||
} else {
|
} else {
|
||||||
|
assert(pCont != NULL);
|
||||||
pQInfo = pCont;
|
pQInfo = pCont;
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
qTableQuery(pQInfo); // do execute query
|
if (pQInfo != NULL) {
|
||||||
|
qTableQuery(pQInfo); // do execute query
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,7 +87,7 @@ static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t c
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is received", pVnode, pVnode->vgId, pQInfo);
|
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is received", pVnode, pVnode->vgId, pQInfo);
|
||||||
|
|
||||||
pRet->code = qRetrieveQueryResultInfo(pQInfo);
|
pRet->code = qRetrieveQueryResultInfo(pQInfo);
|
||||||
if (pRet->code != TSDB_CODE_SUCCESS) {
|
if (pRet->code != TSDB_CODE_SUCCESS) {
|
||||||
//TODO
|
//TODO
|
||||||
|
@ -103,7 +106,7 @@ static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t c
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is disposed", pVnode, pVnode->vgId, pQInfo);
|
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is disposed", pVnode, pVnode->vgId, pQInfo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue