Merge pull request #11262 from taosdata/feature/3.0_liaohj

[td-14493] support having in group by
This commit is contained in:
Haojun Liao 2022-04-06 18:25:52 +08:00 committed by GitHub
commit af32cfcaef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 272 additions and 227 deletions

View File

@ -163,6 +163,7 @@ typedef struct SReqResultInfo {
uint64_t totalRows; uint64_t totalRows;
uint32_t current; uint32_t current;
bool completed; bool completed;
int32_t precision;
int32_t payloadLen; int32_t payloadLen;
} SReqResultInfo; } SReqResultInfo;

View File

@ -825,6 +825,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
pResultInfo->current = 0; pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1); pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->payloadLen = htonl(pRsp->compLen); pResultInfo->payloadLen = htonl(pRsp->compLen);
pResultInfo->precision = pRsp->precision;
// TODO handle the compressed case // TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows; pResultInfo->totalRows += pResultInfo->numOfRows;

View File

@ -323,9 +323,14 @@ int taos_affected_rows(TAOS_RES *res) {
} }
int taos_result_precision(TAOS_RES *res) { int taos_result_precision(TAOS_RES *res) {
SRequestObj* pRequest = (SRequestObj*) res;
if (pRequest == NULL) {
return TSDB_TIME_PRECISION_MILLI; return TSDB_TIME_PRECISION_MILLI;
} }
return pRequest->body.resInfo.precision;
}
int taos_select_db(TAOS *taos, const char *db) { int taos_select_db(TAOS *taos, const char *db) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL) { if (pObj == NULL) {

View File

@ -400,6 +400,7 @@ TEST(testCase, show_vgroup_Test) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
TEST(testCase, create_multiple_tables) { TEST(testCase, create_multiple_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -458,7 +459,7 @@ TEST(testCase, create_multiple_tables) {
taos_free_result(pRes); taos_free_result(pRes);
for (int32_t i = 0; i < 20; ++i) { for (int32_t i = 0; i < 25000; ++i) {
char sql[512] = {0}; char sql[512] = {0};
snprintf(sql, tListLen(sql), snprintf(sql, tListLen(sql),
"create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i,
@ -652,7 +653,6 @@ TEST(testCase, projection_query_stables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
TEST(testCase, agg_query_tables) { TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);

View File

@ -172,9 +172,9 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef); tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* pReader, STableBlockDistInfo* pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle); bool isTsdbCacheLastRow(tsdbReaderT* pReader);
/** /**
* *

View File

@ -3046,8 +3046,8 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) {
// return code; // return code;
//} //}
bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle) { bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
return ((STsdbReadHandle *)pTsdbReadHandle)->cachelastrow > TSDB_CACHED_TYPE_NONE; return ((STsdbReadHandle *)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
} }
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) { int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) {

View File

@ -554,6 +554,7 @@ typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
bool isInit; // denote if current val is initialized or not bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width int32_t groupKeyLen; // total group by column width
@ -633,9 +634,10 @@ typedef struct SDistinctOperatorInfo {
SHashObj* pSet; SHashObj* pSet;
SSDataBlock* pRes; SSDataBlock* pRes;
bool recordNullVal; // has already record the null value, no need to try again bool recordNullVal; // has already record the null value, no need to try again
int64_t threshold; // todo remove it // int64_t threshold; // todo remove it
int64_t outputCapacity;// todo remove it // int64_t outputCapacity;// todo remove it
int32_t totalBytes; // todo remove it // int32_t totalBytes; // todo remove it
SResultInfo resInfo;
char* buf; char* buf;
SArray* pDistinctDataInfo; SArray* pDistinctDataInfo;
} SDistinctOperatorInfo; } SDistinctOperatorInfo;
@ -654,6 +656,8 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows,
char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs,
uint64_t* total, SArray* pColList); uint64_t* total, SArray* pColList);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
@ -670,7 +674,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo);

View File

@ -51,8 +51,6 @@
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define MULTI_KEY_DELIM "-"
enum { enum {
TS_JOIN_TS_EQUAL = 0, TS_JOIN_TS_EQUAL = 0,
TS_JOIN_TS_NOT_EQUALS = 1, TS_JOIN_TS_NOT_EQUALS = 1,
@ -221,12 +219,13 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput);
static void doSetOperatorCompleted(SOperatorInfo* pOperator) { void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
if (pOperator->pTaskInfo != NULL) { if (pOperator->pTaskInfo != NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
} }
} }
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
@ -1245,11 +1244,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
} }
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
size_t num = 0; size_t num = (pPseudoList != NULL)? taosArrayGetSize(pPseudoList):0;
if (pPseudoList != NULL) {
num = taosArrayGetSize(pPseudoList);
}
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i); pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
} }
@ -1291,8 +1286,10 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
pCtx[k].pOutput = taosArrayGet(pResult->pDataBlock, k); pCtx[k].pOutput = taosArrayGet(pResult->pDataBlock, k);
pCtx[k].offset = pResult->info.rows; // set the start offset pCtx[k].offset = pResult->info.rows; // set the start offset
if (taosArrayGetSize(pPseudoList) > 0) {
int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
}
int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]); int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]);
pResult->info.rows += numOfRows; pResult->info.rows += numOfRows;
@ -3278,6 +3275,44 @@ void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx,
} }
} }
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
if (pFilterNode == NULL) {
return;
}
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
SSDataBlock* px = createOneDataBlock(pBlock);
blockDataEnsureCapacity(px, pBlock->info.rows);
int32_t numOfRow = 0;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
numOfRow = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
numOfRow += 1;
}
*pSrc = *pDst;
}
pBlock->info.rows = numOfRow;
}
void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) { void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) {
// for simple group by query without interval, all the tables belong to one group result. // for simple group by query without interval, all the tables belong to one group result.
int64_t uid = 0; int64_t uid = 0;
@ -6212,14 +6247,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
} }
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param;
taosHashCleanup(pInfo->pSet);
taosMemoryFreeClear(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param; SExchangeInfo* pExInfo = (SExchangeInfo*)param;
taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSources);
@ -6791,146 +6818,7 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
return pOperator; return pOperator;
} }
static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) {
// distinct info already inited
return true;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
// pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfBlock = (int)(taosArrayGetSize(pBlock->pDataBlock));
assert(i < numOfBlock);
for (int j = 0; j < numOfBlock; j++) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) {
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
}
}
}
pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes);
return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false;
}
static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) {
char* p = pInfo->buf;
memset(p, 0, pInfo->totalBytes);
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i);
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
continue;
}
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
memcpy(p, varDataVal(val), varDataLen(val));
p += varDataLen(val);
} else {
memcpy(p, val, pDistDataInfo->bytes);
p += pDistDataInfo->bytes;
}
memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM));
p += strlen(MULTI_KEY_DELIM);
}
}
static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) {
doSetOperatorCompleted(pOperator);
break;
}
// ensure result output buf
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows;
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i);
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i);
char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
if (tmp == NULL) {
return NULL;
} else {
pResultColInfoData->pData = tmp;
}
}
pInfo->outputCapacity = newSize;
}
for (int32_t i = 0; i < pBlock->info.rows; i++) {
buildMultiDistinctKey(pInfo, pBlock, i);
if (taosHashGet(pInfo->pSet, pInfo->buf, pInfo->totalBytes) == NULL) {
int32_t dummy;
taosHashPut(pInfo->pSet, pInfo->buf, pInfo->totalBytes, &dummy, sizeof(dummy));
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
memcpy(start, val, pDistDataInfo->bytes);
}
pRes->info.rows += 1;
}
}
if (pRes->info.rows >= pInfo->threshold) {
break;
}
}
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
}
SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pOperator->resultInfo.capacity = 4096; // todo extract function.
pInfo->totalBytes = 0;
pInfo->buf = NULL;
pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo));
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pOperator->name = "DistinctOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = DISTINCT;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->getNextFn = hashDistinct;
pOperator->closeFn = destroyDistinctOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
int32_t j = 0; int32_t j = 0;
@ -7164,7 +7052,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfCols = 0; int32_t numOfCols = 0;
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo); pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
@ -7174,8 +7061,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
@ -7232,7 +7118,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pAggNode->pGroupKeys != NULL) { if (pAggNode->pGroupKeys != NULL) {
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL);
} else { } else {
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
} }

View File

@ -245,13 +245,14 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
} }
SGroupbyOperatorInfo* pInfo = pOperator->info; SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.rowCellInfoOffset); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pInfo->binfo.pRes; return pRes;
} }
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
@ -283,18 +284,29 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
// pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.rowCellInfoOffset);
// } // }
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); blockDataEnsureCapacity(pRes, pInfo->binfo.capacity);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
while(1) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity,
pInfo->binfo.rowCellInfoOffset); pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
if (!hasRemain) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
break;
}
if (pRes->info.rows > 0) {
break;
}
} }
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo) { const STableGroupInfo* pTableGroupInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -303,6 +315,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
} }
pInfo->pGroupCols = pGroupColList; pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
@ -330,3 +343,165 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
return NULL; return NULL;
} }
#define MULTI_KEY_DELIM "-"
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param;
taosHashCleanup(pInfo->pSet);
taosMemoryFreeClear(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) {
char* p = pInfo->buf;
// memset(p, 0, pInfo->totalBytes);
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i);
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
continue;
}
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
memcpy(p, varDataVal(val), varDataLen(val));
p += varDataLen(val);
} else {
memcpy(p, val, pDistDataInfo->bytes);
p += pDistDataInfo->bytes;
}
memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM));
p += strlen(MULTI_KEY_DELIM);
}
}
static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator) {
for (int i = 0; i < pOperator->numOfOutput; i++) {
// pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
#if 0
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock));
assert(i < numOfCols);
for (int j = 0; j < numOfCols; j++) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) {
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
}
}
}
#endif
// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes);
return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false;
}
static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
// ensure result output buf
if (pRes->info.rows + pBlock->info.rows > pInfo->resInfo.capacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows;
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i);
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i);
// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
// if (tmp == NULL) {
// return NULL;
// } else {
// pResultColInfoData->pData = tmp;
// }
}
pInfo->resInfo.capacity = newSize;
}
for (int32_t i = 0; i < pBlock->info.rows; i++) {
buildMultiDistinctKey(pInfo, pBlock, i);
if (taosHashGet(pInfo->pSet, pInfo->buf, 0) == NULL) {
taosHashPut(pInfo->pSet, pInfo->buf, 0, NULL, 0);
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
memcpy(start, val, pDistDataInfo->bytes);
}
pRes->info.rows += 1;
}
}
if (pRes->info.rows >= pInfo->resInfo.threshold) {
break;
}
}
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
}
SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pOperator->resultInfo.capacity = 4096; // todo extract function.
// pInfo->totalBytes = 0;
pInfo->buf = NULL;
pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo));
initMultiDistinctInfo(pInfo, pOperator);
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pOperator->name = "DistinctOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = DISTINCT;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->getNextFn = hashDistinct;
pOperator->closeFn = destroyDistinctOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}

View File

@ -34,6 +34,7 @@
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SWITCH_ORDER(pCtx[i].order); SWITCH_ORDER(pCtx[i].order);
@ -91,39 +92,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo,
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
} }
if (pTableScanInfo->pFilterNode != NULL) { doFilter(pTableScanInfo->pFilterNode, pBlock);
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode((SNode*)pTableScanInfo->pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
SSDataBlock* px = createOneDataBlock(pBlock);
blockDataEnsureCapacity(px, pBlock->info.rows);
int32_t numOfRow = 0;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
numOfRow = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
numOfRow += 1;
}
*pSrc = *pDst;
}
pBlock->info.rows = numOfRow;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -37,7 +37,7 @@ typedef struct SHNode {
char data[]; char data[];
} SHNode; } SHNode;
typedef struct SSHashObj { struct SSHashObj {
SHNode **hashList; SHNode **hashList;
size_t capacity; // number of slots size_t capacity; // number of slots
int64_t size; // number of elements in hash table int64_t size; // number of elements in hash table
@ -45,7 +45,7 @@ typedef struct SSHashObj {
_equal_fn_t equalFp; // equal function _equal_fn_t equalFp; // equal function
int32_t keyLen; int32_t keyLen;
int32_t dataLen; int32_t dataLen;
} SSHashObj; };
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
int32_t len = MIN(length, HASH_MAX_CAPACITY); int32_t len = MIN(length, HASH_MAX_CAPACITY);
@ -107,7 +107,7 @@ static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pDat
return pNewNode; return pNewNode;
} }
void taosHashTableResize(SSHashObj *pHashObj) { static void taosHashTableResize(SSHashObj *pHashObj) {
if (!HASH_NEED_RESIZE(pHashObj)) { if (!HASH_NEED_RESIZE(pHashObj)) {
return; return;
} }

View File

@ -917,7 +917,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
colDataAppendInt32(pOutput, pos, &delta); colDataAppendInt32(pOutput, pos, &delta);
} }
if (tsList != NULL) { if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]); colDataAppendInt64(pTsOutput, pos, &tsList[i]);
} }
} }

View File

@ -311,10 +311,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
} }
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
return -1;
}
// need the resize process, write lock applied // need the resize process, write lock applied
if (HASH_NEED_RESIZE(pHashObj)) { if (HASH_NEED_RESIZE(pHashObj)) {
@ -355,6 +351,11 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
if (pNode == NULL) { if (pNode == NULL) {
// no data in hash table with the specified key, add it into hash table // no data in hash table with the specified key, add it into hash table
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
return -1;
}
pushfrontNodeInEntryList(pe, pNewNode); pushfrontNodeInEntryList(pe, pNewNode);
assert(pe->next != NULL); assert(pe->next != NULL);
@ -368,9 +369,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
} else { } else {
// not support the update operation, return error // not support the update operation, return error
if (pHashObj->enableUpdate) { if (pHashObj->enableUpdate) {
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
return -1;
}
doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
} else {
FREE_HASH_NODE(pNewNode);
} }
taosHashEntryWUnlock(pHashObj, pe); taosHashEntryWUnlock(pHashObj, pe);