Merge pull request #17389 from taosdata/fix/liao_cov
fix(query): fix bug in group by tag.
This commit is contained in:
commit
08397dc698
|
@ -151,9 +151,6 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
|
||||||
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
|
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
|
||||||
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
|
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
|
||||||
idStr);
|
idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
||||||
tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr);
|
tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr);
|
||||||
|
@ -466,8 +463,8 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
||||||
int32_t code = 0;
|
|
||||||
int32_t step = pIter->backward ? -1 : 1;
|
int32_t step = pIter->backward ? -1 : 1;
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// no qualified last file block in current file, no need to fetch row
|
// no qualified last file block in current file, no need to fetch row
|
||||||
if (pIter->pSttBlk == NULL) {
|
if (pIter->pSttBlk == NULL) {
|
||||||
|
@ -476,6 +473,10 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
||||||
|
|
||||||
int32_t iBlockL = pIter->iSttBlk;
|
int32_t iBlockL = pIter->iSttBlk;
|
||||||
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
|
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
|
||||||
|
if (pBlockData == NULL && terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
pIter->iRow += step;
|
pIter->iRow += step;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -501,11 +502,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
||||||
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
|
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
|
||||||
terrno = code;
|
|
||||||
}
|
|
||||||
|
|
||||||
return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
|
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
|
||||||
|
|
|
@ -340,7 +340,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
|
||||||
pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
|
pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
|
||||||
if (pIter->pLastBlockReader == NULL) {
|
if (pIter->pLastBlockReader == NULL) {
|
||||||
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
|
tsdbError("failed to prepare the last block iterator, code:%s %s", tstrerror(code), pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -646,7 +646,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbDebug(
|
tsdbDebug(
|
||||||
"load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
|
"load block of %"PRIzu" tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
|
||||||
"time:%.2f ms %s",
|
"time:%.2f ms %s",
|
||||||
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
|
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
|
@ -1515,6 +1515,11 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
|
||||||
|
|
||||||
taosMemoryFree(pReader->pMemSchema);
|
taosMemoryFree(pReader->pMemSchema);
|
||||||
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
|
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
return pReader->pMemSchema;
|
return pReader->pMemSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3085,7 +3090,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
tRowMerge(&merge, piRow);
|
tRowMerge(&merge, piRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tRowMergerGetRow(&merge, pTSRow);
|
int32_t code = tRowMergerGetRow(&merge, pTSRow);
|
||||||
|
@ -3443,7 +3448,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr);
|
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3732,7 +3737,7 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
if (isEmptyQueryTimeWindow(&pReader->window)) {
|
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1138,7 +1138,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
|
||||||
s.bytes = bytes;
|
s.bytes = bytes;
|
||||||
s.slotId = slotId;
|
s.slotId = slotId;
|
||||||
s.precision = precision;
|
s.precision = precision;
|
||||||
strncpy(s.name, name, tListLen(s.name));
|
tstrncpy(s.name, name, tListLen(s.name));
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
@ -1366,7 +1366,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
||||||
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
||||||
} else {
|
} else {
|
||||||
char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
|
char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
|
||||||
strncpy(pCtx->udfName, udfName, TSDB_FUNC_NAME_LEN);
|
tstrncpy(pCtx->udfName, udfName, TSDB_FUNC_NAME_LEN);
|
||||||
fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
||||||
}
|
}
|
||||||
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
|
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
|
||||||
|
|
|
@ -616,7 +616,7 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
||||||
|
|
||||||
int32_t nOptrWithVal = 0;
|
int32_t nOptrWithVal = 0;
|
||||||
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
|
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
|
||||||
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
|
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
|
||||||
taosMemoryFreeClear(*pOutput);
|
taosMemoryFreeClear(*pOutput);
|
||||||
*len = 0;
|
*len = 0;
|
||||||
}
|
}
|
||||||
|
@ -701,10 +701,10 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo mtInfo) {
|
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
|
||||||
memset(pCond, 0, sizeof(SQueryTableDataCond));
|
memset(pCond, 0, sizeof(SQueryTableDataCond));
|
||||||
pCond->order = TSDB_ORDER_ASC;
|
pCond->order = TSDB_ORDER_ASC;
|
||||||
pCond->numOfCols = mtInfo.schema->nCols;
|
pCond->numOfCols = pMtInfo->schema->nCols;
|
||||||
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
|
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
|
||||||
if (pCond->colList == NULL) {
|
if (pCond->colList == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
@ -712,15 +712,15 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
|
||||||
}
|
}
|
||||||
|
|
||||||
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
pCond->suid = mtInfo.suid;
|
pCond->suid = pMtInfo->suid;
|
||||||
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
|
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
|
||||||
pCond->startVersion = -1;
|
pCond->startVersion = -1;
|
||||||
pCond->endVersion = sContext->snapVersion;
|
pCond->endVersion = sContext->snapVersion;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||||
pCond->colList[i].type = mtInfo.schema->pSchema[i].type;
|
pCond->colList[i].type = pMtInfo->schema->pSchema[i].type;
|
||||||
pCond->colList[i].bytes = mtInfo.schema->pSchema[i].bytes;
|
pCond->colList[i].bytes = pMtInfo->schema->pSchema[i].bytes;
|
||||||
pCond->colList[i].colId = mtInfo.schema->pSchema[i].colId;
|
pCond->colList[i].colId = pMtInfo->schema->pSchema[i].colId;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -844,7 +844,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
|
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
|
||||||
if (mtInfo.uid == 0) return 0; // no data
|
if (mtInfo.uid == 0) return 0; // no data
|
||||||
|
|
||||||
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo);
|
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
|
||||||
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
|
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
|
||||||
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
|
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
|
||||||
|
|
|
@ -895,33 +895,6 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
|
|
||||||
#if 0
|
|
||||||
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
|
||||||
uint32_t status = BLK_DATA_NOT_LOAD;
|
|
||||||
|
|
||||||
int32_t numOfOutput = 0; // pTableScanInfo->numOfOutput;
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
int32_t functionId = pCtx[i].functionId;
|
|
||||||
int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
|
|
||||||
|
|
||||||
// group by + first/last should not apply the first/last block filter
|
|
||||||
if (functionId < 0) {
|
|
||||||
status |= BLK_DATA_DATA_LOAD;
|
|
||||||
return status;
|
|
||||||
} else {
|
|
||||||
// status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
|
|
||||||
// if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
|
|
||||||
// return status;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return status;
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
uint32_t* status) {
|
uint32_t* status) {
|
||||||
*status = BLK_DATA_NOT_LOAD;
|
*status = BLK_DATA_NOT_LOAD;
|
||||||
|
@ -1802,7 +1775,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
pSourceDataInfo->code = code;
|
pSourceDataInfo->code = code;
|
||||||
qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
|
qDebug("%s fetch rsp received, index:%d, code:%s", pSourceDataInfo->taskId, index, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
|
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
|
||||||
|
@ -3397,8 +3370,8 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
||||||
SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
|
SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
|
||||||
pSchema->colId = pColNode->colId;
|
pSchema->colId = pColNode->colId;
|
||||||
pSchema->type = pColNode->node.resType.type;
|
pSchema->type = pColNode->node.resType.type;
|
||||||
pSchema->type = pColNode->node.resType.bytes;
|
pSchema->bytes = pColNode->node.resType.bytes;
|
||||||
strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
|
tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
|
||||||
}
|
}
|
||||||
|
|
||||||
// this the tags and pseudo function columns, we only keep the tag columns
|
// this the tags and pseudo function columns, we only keep the tag columns
|
||||||
|
@ -3412,7 +3385,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
||||||
SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
|
SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
|
||||||
pSchema->colId = pColNode->colId;
|
pSchema->colId = pColNode->colId;
|
||||||
pSchema->type = pColNode->node.resType.type;
|
pSchema->type = pColNode->node.resType.type;
|
||||||
pSchema->type = pColNode->node.resType.bytes;
|
pSchema->bytes = pColNode->node.resType.bytes;
|
||||||
strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
|
strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -284,6 +284,19 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
|
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||||
|
|
||||||
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
uint32_t* status) {
|
uint32_t* status) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -312,8 +325,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||||
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
pCost->skipBlocks += 1;
|
|
||||||
|
|
||||||
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
|
pCost->skipBlocks += 1;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||||
pCost->loadBlockStatis += 1;
|
pCost->loadBlockStatis += 1;
|
||||||
|
@ -322,6 +336,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||||
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
|
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
|
||||||
|
@ -371,17 +386,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
}
|
}
|
||||||
|
|
||||||
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
|
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
|
||||||
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
// currently only the tbname pseudo column
|
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
|
||||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
|
||||||
|
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
|
||||||
GET_TASKID(pTaskInfo));
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTableScanInfo->pFilterNode != NULL) {
|
if (pTableScanInfo->pFilterNode != NULL) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -1079,12 +1084,13 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
|
||||||
|
|
||||||
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
|
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
|
||||||
SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
|
SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
|
||||||
SResultRowInfo dumyInfo;
|
SResultRowInfo dumyInfo = {0};
|
||||||
dumyInfo.cur.pageId = -1;
|
dumyInfo.cur.pageId = -1;
|
||||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
|
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
|
||||||
STimeWindow endWin = win;
|
STimeWindow endWin = win;
|
||||||
STimeWindow preWin = win;
|
STimeWindow preWin = win;
|
||||||
uint64_t groupId = gpIdCol[*pRowIndex];
|
uint64_t groupId = gpIdCol[*pRowIndex];
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (hasGroup) {
|
if (hasGroup) {
|
||||||
(*pRowIndex) += 1;
|
(*pRowIndex) += 1;
|
||||||
|
@ -1148,6 +1154,9 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
|
||||||
pResult->info.rows++;
|
pResult->info.rows++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blockDataDestroy(tmpBlock);
|
||||||
|
|
||||||
if (pResult->info.rows > 0) {
|
if (pResult->info.rows > 0) {
|
||||||
pResult->info.calWin = pInfo->updateWin;
|
pResult->info.calWin = pInfo->updateWin;
|
||||||
return pResult;
|
return pResult;
|
||||||
|
@ -1316,7 +1325,7 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
|
||||||
|
|
||||||
blockDataEnsureCapacity(pResBlock, 1);
|
blockDataEnsureCapacity(pResBlock, 1);
|
||||||
|
|
||||||
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, pTagCalSup->numOfExprs, NULL);
|
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
|
||||||
ASSERT(pResBlock->info.rows == 1);
|
ASSERT(pResBlock->info.rows == 1);
|
||||||
|
|
||||||
// build tagArray
|
// build tagArray
|
||||||
|
@ -1543,7 +1552,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
tsdbReaderClose(pTSInfo->dataReader);
|
tsdbReaderClose(pTSInfo->dataReader);
|
||||||
pTSInfo->dataReader = NULL;
|
pTSInfo->dataReader = NULL;
|
||||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
||||||
qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1);
|
qDebug("queue scan tsdb over, switch to wal ver %"PRId64, pTaskInfo->streamInfo.snapshotVer + 1);
|
||||||
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
|
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2120,9 +2129,6 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
||||||
taosMemoryFree(pStreamScan->pPseudoExpr);
|
taosMemoryFree(pStreamScan->pPseudoExpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanupExprSupp(&pStreamScan->tbnameCalSup);
|
|
||||||
cleanupExprSupp(&pStreamScan->tagCalSup);
|
|
||||||
|
|
||||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||||
blockDataDestroy(pStreamScan->pRes);
|
blockDataDestroy(pStreamScan->pRes);
|
||||||
blockDataDestroy(pStreamScan->pUpdateRes);
|
blockDataDestroy(pStreamScan->pUpdateRes);
|
||||||
|
@ -2995,7 +3001,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int64_t startTs = taosGetTimestampUs();
|
int64_t startTs = taosGetTimestampUs();
|
||||||
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
|
tstrncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
|
||||||
strcpy(pInfo->req.user, pInfo->pUser);
|
strcpy(pInfo->req.user, pInfo->pUser);
|
||||||
|
|
||||||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
|
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
|
||||||
|
|
|
@ -2842,14 +2842,13 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||||
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
|
|
||||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
|
||||||
|
|
||||||
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
|
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
|
||||||
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
|
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
|
||||||
pInfo->gap = pSessionNode->gap;
|
pInfo->gap = pSessionNode->gap;
|
||||||
|
@ -4666,7 +4665,6 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
||||||
destroyStreamSessionAggOperatorInfo(pChInfo);
|
destroyStreamSessionAggOperatorInfo(pChInfo);
|
||||||
taosMemoryFreeClear(pChild);
|
taosMemoryFreeClear(pChild);
|
||||||
taosMemoryFreeClear(pChInfo);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
|
||||||
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
|
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
|
||||||
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
|
#define HASH_MAX_CAPACITY (1024 * 1024 * 16L)
|
||||||
#define SHASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * SHASH_DEFAULT_LOAD_FACTOR)
|
#define SHASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * SHASH_DEFAULT_LOAD_FACTOR)
|
||||||
|
|
||||||
#define GET_SHASH_NODE_KEY(_n, _dl) ((char *)(_n) + sizeof(SHNode) + (_dl))
|
#define GET_SHASH_NODE_KEY(_n, _dl) ((char *)(_n) + sizeof(SHNode) + (_dl))
|
||||||
|
@ -104,7 +104,7 @@ static void tSimpleHashTableResize(SSHashObj *pHashObj) {
|
||||||
|
|
||||||
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
|
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
|
||||||
if (newCapacity > HASH_MAX_CAPACITY) {
|
if (newCapacity > HASH_MAX_CAPACITY) {
|
||||||
uDebug("current capacity:%zu, maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached",
|
uDebug("current capacity:%"PRIzu", maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached",
|
||||||
pHashObj->capacity, HASH_MAX_CAPACITY);
|
pHashObj->capacity, HASH_MAX_CAPACITY);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,9 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) {
|
||||||
|
|
||||||
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
|
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
// *((bool*)pContext) =
|
*((bool*)pContext) =
|
||||||
// (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
|
(COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
|
||||||
*((bool*)pContext) = true;
|
|
||||||
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
|
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
|
|
|
@ -243,8 +243,8 @@ void enumAllWords(STireNode** nodes, char* prefix, SMatch* match) {
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
// combine word string
|
// combine word string
|
||||||
memset(word, 0, sizeof(word));
|
memset(word, 0, tListLen(word));
|
||||||
strcpy(word, prefix);
|
strncpy(word, prefix, len);
|
||||||
word[len] = FIRST_ASCII + i; // append current char
|
word[len] = FIRST_ASCII + i; // append current char
|
||||||
|
|
||||||
// chain middle node
|
// chain middle node
|
||||||
|
@ -315,8 +315,7 @@ void matchPrefixFromTree(STire* tire, char* prefix, SMatch* match) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// return
|
taosMemoryFree(root);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SMatch* matchPrefix(STire* tire, char* prefix, SMatch* match) {
|
SMatch* matchPrefix(STire* tire, char* prefix, SMatch* match) {
|
||||||
|
|
Loading…
Reference in New Issue