Revert "fix(query): handle partition by in table scan operator"
This commit is contained in:
parent
d0a7a43e6f
commit
4b2fa43dae
|
@ -164,6 +164,14 @@ typedef enum EStreamType {
|
||||||
STREAM_FILL_OVER,
|
STREAM_FILL_OVER,
|
||||||
} EStreamType;
|
} EStreamType;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SArray* pGroupList;
|
||||||
|
SArray* pTableList;
|
||||||
|
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||||
|
bool needSortTableByGroupId;
|
||||||
|
uint64_t suid;
|
||||||
|
} STableListInfo;
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
typedef struct SColumnDataAgg {
|
typedef struct SColumnDataAgg {
|
||||||
int16_t colId;
|
int16_t colId;
|
||||||
|
|
|
@ -152,9 +152,10 @@ typedef struct STsdbReader STsdbReader;
|
||||||
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
||||||
#define CACHESCAN_RETRIEVE_LAST 0x8
|
#define CACHESCAN_RETRIEVE_LAST 0x8
|
||||||
|
|
||||||
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num);
|
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
|
||||||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
|
||||||
STsdbReader **ppReader, const char *idstr);
|
const char *idstr);
|
||||||
|
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||||
bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid);
|
bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid);
|
||||||
|
@ -169,8 +170,8 @@ void *tsdbGetIdx(SMeta *pMeta);
|
||||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||||
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
||||||
|
|
||||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables,
|
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, uint64_t suid,
|
||||||
int32_t numOfCols, uint64_t suid, void** pReader);
|
void **pReader);
|
||||||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
||||||
void *tsdbCacherowsReaderClose(void *pReader);
|
void *tsdbCacherowsReaderClose(void *pReader);
|
||||||
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||||
|
|
|
@ -716,10 +716,7 @@ typedef struct SCacheRowsReader {
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
int32_t type;
|
int32_t type;
|
||||||
int32_t tableIndex; // currently returned result tables
|
int32_t tableIndex; // currently returned result tables
|
||||||
|
SArray *pTableList; // table id list
|
||||||
STableKeyInfo *pTableList; // table id list
|
|
||||||
int32_t numOfTables;
|
|
||||||
|
|
||||||
SSttBlockLoadInfo *pLoadInfo;
|
SSttBlockLoadInfo *pLoadInfo;
|
||||||
STsdbReadSnap *pReadSnap;
|
STsdbReadSnap *pReadSnap;
|
||||||
SDataFReader *pDataFReader;
|
SDataFReader *pDataFReader;
|
||||||
|
|
|
@ -162,7 +162,10 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
|
||||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||||
SSubmitBlkRsp* pRsp);
|
SSubmitBlkRsp* pRsp);
|
||||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
|
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||||
|
void* pMemRef);
|
||||||
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||||
|
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
int tqInit();
|
int tqInit();
|
||||||
|
|
|
@ -713,13 +713,13 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
||||||
|
|
||||||
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
smaError("vgId:%d, process submit req for rsma suid:%"PRIu64", uid:%" PRIu64 " level %" PRIi8 " failed since %s",
|
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
|
||||||
SMA_VID(pSma), suid, output->info.uid, pItem->level, terrstr());
|
SMA_VID(pSma), suid, pItem->level, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%"PRIu64", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32,
|
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " ver %" PRIi64 " len %" PRIu32,
|
||||||
SMA_VID(pSma), suid, output->info.uid, pItem->level, output->info.version, htonl(pReq->header.contLen));
|
SMA_VID(pSma), suid, pItem->level, output->info.version, htonl(pReq->header.contLen));
|
||||||
|
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,9 +97,10 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, uint64_t suid,
|
||||||
uint64_t suid, void** pReader) {
|
void** pReader) {
|
||||||
*pReader = NULL;
|
*pReader = NULL;
|
||||||
|
|
||||||
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
|
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -110,15 +111,14 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
||||||
p->numOfCols = numOfCols;
|
p->numOfCols = numOfCols;
|
||||||
p->suid = suid;
|
p->suid = suid;
|
||||||
|
|
||||||
if (numOfTables == 0) {
|
if (taosArrayGetSize(pTableIdList) == 0) {
|
||||||
*pReader = p;
|
*pReader = p;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
|
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
|
||||||
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
|
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
|
||||||
p->pTableList = pTableIdList;
|
p->pTableList = pTableIdList;
|
||||||
p->numOfTables = numOfTables;
|
|
||||||
|
|
||||||
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
|
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
|
||||||
if (p->transferBuf == NULL) {
|
if (p->transferBuf == NULL) {
|
||||||
|
@ -205,6 +205,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
|
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
|
||||||
LRUHandle* h = NULL;
|
LRUHandle* h = NULL;
|
||||||
SArray* pRow = NULL;
|
SArray* pRow = NULL;
|
||||||
|
size_t numOfTables = taosArrayGetSize(pr->pTableList);
|
||||||
bool hasRes = false;
|
bool hasRes = false;
|
||||||
SArray* pLastCols = NULL;
|
SArray* pLastCols = NULL;
|
||||||
|
|
||||||
|
@ -242,8 +243,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
// retrieve the only one last row of all tables in the uid list.
|
// retrieve the only one last row of all tables in the uid list.
|
||||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
||||||
for (int32_t i = 0; i < pr->numOfTables; ++i) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
|
||||||
|
|
||||||
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
|
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -307,8 +308,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
|
||||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pr->pTableList, i);
|
||||||
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
|
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -270,7 +270,10 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clearBlockScanInfo(STableBlockScanInfo* p) {
|
static void destroyBlockScanInfo(SHashObj* pTableMap) {
|
||||||
|
STableBlockScanInfo* p = NULL;
|
||||||
|
|
||||||
|
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
||||||
p->iterInit = false;
|
p->iterInit = false;
|
||||||
p->iiter.hasVal = false;
|
p->iiter.hasVal = false;
|
||||||
|
|
||||||
|
@ -285,12 +288,6 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) {
|
||||||
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||||
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
||||||
tMapDataClear(&p->mapData);
|
tMapDataClear(&p->mapData);
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyBlockScanInfo(SHashObj* pTableMap) {
|
|
||||||
STableBlockScanInfo* p = NULL;
|
|
||||||
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
|
||||||
clearBlockScanInfo(p);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(pTableMap);
|
taosHashCleanup(pTableMap);
|
||||||
|
@ -3455,23 +3452,13 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO refactor: with createDataBlockScanInfo
|
// todo refactor, use arraylist instead
|
||||||
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
|
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
|
||||||
ASSERT(pReader != NULL);
|
ASSERT(pReader != NULL);
|
||||||
|
|
||||||
STableBlockScanInfo* p = NULL;
|
|
||||||
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
|
|
||||||
clearBlockScanInfo(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashClear(pReader->status.pTableMap);
|
taosHashClear(pReader->status.pTableMap);
|
||||||
|
|
||||||
STableKeyInfo* pList = (STableKeyInfo*) pTableList;
|
STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
|
||||||
STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
|
|
||||||
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
||||||
}
|
|
||||||
|
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3507,8 +3494,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ====================================== EXPOSED APIs ======================================
|
// ====================================== EXPOSED APIs ======================================
|
||||||
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
|
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
|
||||||
STsdbReader** ppReader, const char* idstr) {
|
const char* idstr) {
|
||||||
STimeWindow window = pCond->twindows;
|
STimeWindow window = pCond->twindows;
|
||||||
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
pCond->twindows.skey += 1;
|
pCond->twindows.skey += 1;
|
||||||
|
@ -3567,8 +3554,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
||||||
if (pReader->pSchema == NULL) {
|
if (pReader->pSchema == NULL) {
|
||||||
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
|
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
|
||||||
}
|
}
|
||||||
} else if (numOfTables > 0) {
|
} else if (taosArrayGetSize(pTableList) > 0) {
|
||||||
STableKeyInfo* pKey = pTableList;
|
STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
|
||||||
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
|
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
|
||||||
if (pReader->pSchema == NULL) {
|
if (pReader->pSchema == NULL) {
|
||||||
tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
|
tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
|
||||||
|
@ -3577,7 +3564,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
||||||
|
|
||||||
STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
|
STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
|
||||||
|
|
||||||
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
|
int32_t numOfTables = taosArrayGetSize(pTableList);
|
||||||
|
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList->pData, numOfTables);
|
||||||
if (pReader->status.pTableMap == NULL) {
|
if (pReader->status.pTableMap == NULL) {
|
||||||
tsdbReaderClose(pReader);
|
tsdbReaderClose(pReader);
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
do { \
|
do { \
|
||||||
ASSERT((_c) != -1); \
|
ASSERT((_c) != -1); \
|
||||||
longjmp((_obj), (_c)); \
|
longjmp((_obj), (_c)); \
|
||||||
} while (0)
|
} while (0);
|
||||||
|
|
||||||
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
|
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -95,25 +95,6 @@ typedef struct SColMatchInfo {
|
||||||
int32_t matchType; // determinate the source according to col id or slot id
|
int32_t matchType; // determinate the source according to col id or slot id
|
||||||
} SColMatchInfo;
|
} SColMatchInfo;
|
||||||
|
|
||||||
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
|
|
||||||
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
|
|
||||||
typedef struct STableListInfo {
|
|
||||||
bool oneTableForEachGroup;
|
|
||||||
int32_t numOfOuputGroups; // the data block will be generated one by one
|
|
||||||
int32_t* groupOffset; // keep the offset value for each group in the tableList
|
|
||||||
SArray* pTableList;
|
|
||||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
|
||||||
uint64_t suid;
|
|
||||||
} STableListInfo;
|
|
||||||
|
|
||||||
void destroyTableList(STableListInfo* pTableList);
|
|
||||||
int32_t getNumOfOutputGroups(const STableListInfo* pTableList);
|
|
||||||
bool oneTableForEachGroup(const STableListInfo* pTableList);
|
|
||||||
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
|
|
||||||
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
|
|
||||||
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
|
|
||||||
uint64_t getTotalTables(const STableListInfo* pTableList);
|
|
||||||
|
|
||||||
struct SqlFunctionCtx;
|
struct SqlFunctionCtx;
|
||||||
|
|
||||||
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
|
@ -176,10 +176,11 @@ typedef struct {
|
||||||
typedef struct SExecTaskInfo {
|
typedef struct SExecTaskInfo {
|
||||||
STaskIdInfo id;
|
STaskIdInfo id;
|
||||||
uint32_t status;
|
uint32_t status;
|
||||||
int32_t code;
|
|
||||||
STimeWindow window;
|
STimeWindow window;
|
||||||
STaskCostInfo cost;
|
STaskCostInfo cost;
|
||||||
int64_t owner; // if it is in execution
|
int64_t owner; // if it is in execution
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
int64_t version; // used for stream to record wal version
|
int64_t version; // used for stream to record wal version
|
||||||
SStreamTaskInfo streamInfo;
|
SStreamTaskInfo streamInfo;
|
||||||
SSchemaInfo schemaInfo;
|
SSchemaInfo schemaInfo;
|
||||||
|
@ -1076,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
||||||
|
|
||||||
bool groupbyTbname(SNodeList* pGroupList);
|
bool groupbyTbname(SNodeList* pGroupList);
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey, bool groupSort);
|
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
|
||||||
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
||||||
SGroupResInfo* pGroupResInfo);
|
SGroupResInfo* pGroupResInfo);
|
||||||
|
|
|
@ -48,10 +48,6 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
code =
|
code =
|
||||||
extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
|
removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
|
||||||
|
|
||||||
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
|
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
|
||||||
|
@ -65,15 +61,11 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
// partition by tbname, todo opt perf
|
// partition by tbname
|
||||||
if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) {
|
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
|
||||||
pInfo->retrieveType =
|
pInfo->retrieveType =
|
||||||
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
|
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
|
||||||
|
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
|
||||||
STableKeyInfo* pList = taosArrayGet(pTableList->pTableList, 0);
|
|
||||||
size_t num = taosArrayGetSize(pTableList->pTableList);
|
|
||||||
|
|
||||||
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
|
|
||||||
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
|
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -175,7 +167,16 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid);
|
if (pTableList->map != NULL) {
|
||||||
|
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
|
||||||
|
if (groupId != NULL) {
|
||||||
|
pInfo->pRes->info.groupId = *groupId;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(taosArrayGetSize(pTableList->pTableList) == 1);
|
||||||
|
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
|
||||||
|
pInfo->pRes->info.groupId = pKeyInfo->groupId;
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->indexOfBufferedRes += 1;
|
pInfo->indexOfBufferedRes += 1;
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
|
@ -184,25 +185,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
size_t totalGroups = getNumOfOutputGroups(pTableList);
|
size_t totalGroups = taosArrayGetSize(pTableList->pGroupList);
|
||||||
|
|
||||||
while (pInfo->currentGroupIndex < totalGroups) {
|
while (pInfo->currentGroupIndex < totalGroups) {
|
||||||
|
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
|
||||||
|
|
||||||
STableKeyInfo* pList = NULL;
|
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
|
||||||
int32_t num = 0;
|
|
||||||
|
|
||||||
int32_t code = getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &pList, &num);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
|
|
||||||
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
|
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
|
||||||
taosArrayClear(pInfo->pUidList);
|
taosArrayClear(pInfo->pUidList);
|
||||||
|
|
||||||
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
|
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->currentGroupIndex += 1;
|
pInfo->currentGroupIndex += 1;
|
||||||
|
@ -212,7 +206,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
||||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||||
|
|
||||||
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableList)[0];
|
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
|
||||||
pInfo->pRes->info.groupId = pKeyInfo->groupId;
|
pInfo->pRes->info.groupId = pKeyInfo->groupId;
|
||||||
|
|
||||||
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
||||||
|
|
|
@ -544,7 +544,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.index = 0;
|
ctx.index = 0;
|
||||||
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
|
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
|
||||||
if (ctx.cInfoList == NULL) {
|
if (ctx.cInfoList == NULL) {
|
||||||
|
@ -607,7 +606,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
} else {
|
} else {
|
||||||
void* tag = taosHashGet(tags, uid, sizeof(int64_t));
|
void* tag = taosHashGet(tags, uid, sizeof(int64_t));
|
||||||
ASSERT(tag);
|
ASSERT(tag);
|
||||||
|
|
||||||
STagVal tagVal = {0};
|
STagVal tagVal = {0};
|
||||||
tagVal.cid = pColInfo->info.colId;
|
tagVal.cid = pColInfo->info.colId;
|
||||||
const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal);
|
const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal);
|
||||||
|
@ -638,7 +636,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pResBlock->info.rows = rows;
|
pResBlock->info.rows = rows;
|
||||||
|
|
||||||
// int64_t st1 = taosGetTimestampUs();
|
// int64_t st1 = taosGetTimestampUs();
|
||||||
|
@ -664,12 +661,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
code = TSDB_CODE_OPS_NOT_SUPPORT;
|
code = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
||||||
SColumnNode* pSColumnNode = (SColumnNode*)pNode;
|
SColumnNode* pSColumnNode = (SColumnNode*)pNode;
|
||||||
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
|
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
|
||||||
|
@ -679,12 +674,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
} else {
|
} else {
|
||||||
code = scalarCalculate(pNode, pBlockList, &output);
|
code = scalarCalculate(pNode, pBlockList, &output);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
releaseColInfoData(output.columnData);
|
releaseColInfoData(output.columnData);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(groupData, &output.columnData);
|
taosArrayPush(groupData, &output.columnData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -703,7 +696,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < rows; i++) {
|
for (int i = 0; i < rows; i++) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
|
||||||
|
@ -825,86 +817,38 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) {
|
||||||
taosArrayDestroy(validUid);
|
taosArrayDestroy(validUid);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t nameComparFn(const void* p1, const void* p2) {
|
|
||||||
const char* pName1 = *(const char**) p1;
|
|
||||||
const char* pName2 = *(const char**) p2;
|
|
||||||
|
|
||||||
int32_t ret = strcmp(pName1, pName2);
|
|
||||||
if (ret == 0) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return (ret > 0)? 1:-1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static SArray* getTableNameList(const SNodeListNode* pList) {
|
|
||||||
int32_t len = LIST_LENGTH(pList->pNodeList);
|
|
||||||
SListCell* cell = pList->pNodeList->pHead;
|
|
||||||
|
|
||||||
SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
|
|
||||||
for (int i = 0; i < pList->pNodeList->length; i++) {
|
|
||||||
SValueNode* valueNode = (SValueNode*) cell->pNode;
|
|
||||||
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
|
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
|
||||||
taosArrayDestroy(pTbList);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
char* name = varDataVal(valueNode->datum.p);
|
|
||||||
taosArrayPush(pTbList, &name);
|
|
||||||
cell = cell->pNext;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t numOfTables = taosArrayGetSize(pTbList);
|
|
||||||
|
|
||||||
// order the name
|
|
||||||
taosArraySort(pTbList, nameComparFn);
|
|
||||||
|
|
||||||
// remove the duplicates
|
|
||||||
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
|
|
||||||
taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
|
|
||||||
|
|
||||||
for (int32_t i = 1; i < numOfTables; ++i) {
|
|
||||||
char** name = taosArrayGetLast(pNewList);
|
|
||||||
char** nameInOldList = taosArrayGet(pTbList, i);
|
|
||||||
if (strcmp(*name, *nameInOldList) == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(pNewList, nameInOldList);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pTbList);
|
|
||||||
return pNewList;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) {
|
static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) {
|
||||||
if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
|
if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorNode* pNode = (SOperatorNode*)pTagCond;
|
SOperatorNode* pNode = (SOperatorNode*)pTagCond;
|
||||||
if (pNode->opType != OP_TYPE_IN) {
|
if (pNode->opType != OP_TYPE_IN) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN &&
|
if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN &&
|
||||||
((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) &&
|
((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) &&
|
||||||
(pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
|
(pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
|
||||||
SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
|
SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
|
||||||
|
|
||||||
int32_t len = LIST_LENGTH(pList->pNodeList);
|
int32_t len = LIST_LENGTH(pList->pNodeList);
|
||||||
if (len <= 0) {
|
if (len <= 0) return -1;
|
||||||
|
|
||||||
|
SListCell* cell = pList->pNodeList->pHead;
|
||||||
|
|
||||||
|
SArray* pTbList = taosArrayInit(len, sizeof(void*));
|
||||||
|
for (int i = 0; i < pList->pNodeList->length; i++) {
|
||||||
|
SValueNode* valueNode = (SValueNode*)cell->pNode;
|
||||||
|
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
|
||||||
|
taosArrayDestroy(pTbList);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
char* name = varDataVal(valueNode->datum.p);
|
||||||
|
taosArrayPush(pTbList, &name);
|
||||||
|
cell = cell->pNext;
|
||||||
|
}
|
||||||
|
|
||||||
SArray* pTbList = getTableNameList(pList);
|
for (int i = 0; i < taosArrayGetSize(pTbList); i++) {
|
||||||
int32_t numOfTables = taosArrayGetSize(pTbList);
|
|
||||||
|
|
||||||
for (int i = 0; i < numOfTables; i++) {
|
|
||||||
char* name = taosArrayGetP(pTbList, i);
|
char* name = taosArrayGetP(pTbList, i);
|
||||||
|
|
||||||
uint64_t uid = 0;
|
uint64_t uid = 0;
|
||||||
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
|
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
|
||||||
ETableType tbType = TSDB_TABLE_MAX;
|
ETableType tbType = TSDB_TABLE_MAX;
|
||||||
|
@ -919,14 +863,11 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pTbList);
|
taosArrayDestroy(pTbList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
||||||
STableListInfo* pListInfo) {
|
STableListInfo* pListInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1005,6 +946,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(res);
|
taosArrayDestroy(res);
|
||||||
|
|
||||||
|
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
if (pListInfo->pGroupList == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
// put into list as default group, remove it if grouping sorting is required later
|
||||||
|
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1655,81 +1604,3 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
|
||||||
pLimitInfo->remainOffset = limit.offset;
|
pLimitInfo->remainOffset = limit.offset;
|
||||||
pLimitInfo->remainGroupOffset = slimit.offset;
|
pLimitInfo->remainGroupOffset = slimit.offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t getTotalTables(const STableListInfo* pTableList) {
|
|
||||||
if (pTableList->map != NULL) {
|
|
||||||
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
|
|
||||||
}
|
|
||||||
|
|
||||||
return taosArrayGetSize(pTableList->pTableList);
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
|
||||||
if (pTableList->oneTableForEachGroup) {
|
|
||||||
return tableUid;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
|
|
||||||
if (groupId != NULL) {
|
|
||||||
return *groupId;
|
|
||||||
} else {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
|
|
||||||
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
|
|
||||||
|
|
||||||
taosArrayPush(pTableList->pTableList, &keyInfo);
|
|
||||||
if (pTableList->oneTableForEachGroup || pTableList->numOfOuputGroups > 1) {
|
|
||||||
taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) {
|
|
||||||
int32_t total = getNumOfOutputGroups(pTableList);
|
|
||||||
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
|
||||||
}
|
|
||||||
|
|
||||||
// here handle two special cases:
|
|
||||||
// 1. only one group exists, and 2. one table exists for each group.
|
|
||||||
if (total == 1) {
|
|
||||||
*size = getTotalTables(pTableList);
|
|
||||||
*pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else if (total == getTotalTables(pTableList)) {
|
|
||||||
*size = 1;
|
|
||||||
*pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
|
|
||||||
if (ordinalGroupIndex < total - 1) {
|
|
||||||
*size = pTableList->groupOffset[offset + 1] - pTableList->groupOffset[offset];
|
|
||||||
} else {
|
|
||||||
*size = total - pTableList->groupOffset[offset] - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
*pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t getNumOfOutputGroups(const STableListInfo* pTableList) {
|
|
||||||
return pTableList->numOfOuputGroups;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool oneTableForEachGroup(const STableListInfo* pTableList) {
|
|
||||||
return pTableList->oneTableForEachGroup;
|
|
||||||
}
|
|
||||||
|
|
||||||
void destroyTableList(STableListInfo* pTableqinfoList) {
|
|
||||||
pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList);
|
|
||||||
taosMemoryFreeClear(pTableqinfoList->groupOffset);
|
|
||||||
|
|
||||||
taosHashCleanup(pTableqinfoList->map);
|
|
||||||
|
|
||||||
pTableqinfoList->pTableList = NULL;
|
|
||||||
pTableqinfoList->map = NULL;
|
|
||||||
}
|
|
|
@ -293,7 +293,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
|
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pListInfo->map == NULL) {
|
||||||
|
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
}
|
||||||
|
|
||||||
// traverse to the stream scanner node to add this table id
|
// traverse to the stream scanner node to add this table id
|
||||||
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
||||||
|
@ -305,10 +307,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
SStreamScanInfo* pScanInfo = pInfo->info;
|
SStreamScanInfo* pScanInfo = pInfo->info;
|
||||||
if (isAdd) { // add new table id
|
if (isAdd) { // add new table id
|
||||||
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
|
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
|
||||||
int32_t numOfQualifiedTables = taosArrayGetSize(qa);
|
|
||||||
|
|
||||||
qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables);
|
|
||||||
|
|
||||||
|
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
||||||
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroy(qa);
|
taosArrayDestroy(qa);
|
||||||
|
@ -328,9 +328,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableListInfo* pTableListInfo = &pTaskInfo->tableqinfoList;
|
for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
|
|
||||||
uint64_t* uid = taosArrayGet(qa, i);
|
uint64_t* uid = taosArrayGet(qa, i);
|
||||||
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
|
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
|
||||||
|
|
||||||
|
@ -360,7 +358,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
|
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
#endif
|
#endif
|
||||||
addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId);
|
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||||
|
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (keyBuf != NULL) {
|
if (keyBuf != NULL) {
|
||||||
|
@ -936,7 +935,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
|
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
|
||||||
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
|
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
int32_t numOfTables = getTotalTables(&pTaskInfo->tableqinfoList);
|
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
|
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
|
||||||
|
@ -945,7 +944,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
bool found = false;
|
bool found = false;
|
||||||
for (int32_t i = 0; i < numOfTables; i++) {
|
for (int32_t i = 0; i < tableSz; i++) {
|
||||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
||||||
if (pTableInfo->uid == uid) {
|
if (pTableInfo->uid == uid) {
|
||||||
found = true;
|
found = true;
|
||||||
|
@ -958,17 +957,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
ASSERT(found);
|
ASSERT(found);
|
||||||
|
|
||||||
if (pTableScanInfo->dataReader == NULL) {
|
if (pTableScanInfo->dataReader == NULL) {
|
||||||
STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
|
||||||
int32_t num = getTotalTables(&pTaskInfo->tableqinfoList);
|
pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
|
||||||
|
pTableScanInfo->dataReader == NULL) {
|
||||||
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
|
|
||||||
&pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) {
|
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo tki = {.uid = uid};
|
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
||||||
tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1);
|
|
||||||
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
|
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
|
||||||
pTableScanInfo->cond.twindows.skey = ts + 1;
|
pTableScanInfo->cond.twindows.skey = ts + 1;
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
@ -976,7 +972,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
|
|
||||||
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
|
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
|
||||||
ts, pTableScanInfo->currentTable, numOfTables);
|
ts, pTableScanInfo->currentTable, tableSz);
|
||||||
/*}*/
|
/*}*/
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -998,15 +994,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
|
|
||||||
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
|
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
|
||||||
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
|
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
|
||||||
|
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
|
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
|
||||||
|
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
|
||||||
pListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
|
||||||
taosArrayPush(pListInfo->pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
|
|
||||||
|
|
||||||
STableKeyInfo* pList = taosArrayGet(pListInfo->pTableList, 0);
|
|
||||||
|
|
||||||
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, taosArrayGetSize(pListInfo->pTableList),
|
|
||||||
&pInfo->dataReader, NULL);
|
&pInfo->dataReader, NULL);
|
||||||
|
|
||||||
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
|
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
|
||||||
|
|
|
@ -1739,6 +1739,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doDestroyTableList(STableListInfo* pTableqinfoList);
|
||||||
|
|
||||||
typedef struct SFetchRspHandleWrapper {
|
typedef struct SFetchRspHandleWrapper {
|
||||||
uint32_t exchangeId;
|
uint32_t exchangeId;
|
||||||
int32_t sourceIndex;
|
int32_t sourceIndex;
|
||||||
|
@ -3364,116 +3366,62 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
|
||||||
|
|
||||||
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
|
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
|
||||||
|
|
||||||
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
|
|
||||||
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
|
|
||||||
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
|
|
||||||
|
|
||||||
if (pInfo1->groupId == pInfo2->groupId) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return pInfo1->groupId < pInfo2->groupId? -1:1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
taosArrayClear(pTableListInfo->pGroupList);
|
||||||
|
|
||||||
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
|
||||||
int32_t size = getTotalTables(pTableListInfo);
|
|
||||||
|
|
||||||
SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
|
||||||
|
|
||||||
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
|
|
||||||
uint64_t gid = pInfo->groupId;
|
|
||||||
|
|
||||||
int32_t start = 0;
|
|
||||||
taosArrayPush(pList, &start);
|
|
||||||
|
|
||||||
for(int32_t i = 1; i < size; ++i) {
|
|
||||||
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
|
||||||
if (pInfo->groupId != gid) {
|
|
||||||
taosArrayPush(pList, &i);
|
|
||||||
gid = pInfo->groupId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
|
|
||||||
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
|
||||||
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
|
||||||
taosArrayDestroy(pList);
|
|
||||||
|
|
||||||
# if 0
|
|
||||||
|
|
||||||
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
|
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
|
||||||
if (sortSupport == NULL) {
|
if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
|
||||||
}
|
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(pTableListInfo->pTableList);
|
|
||||||
for (int32_t i = 0; i < num; i++) {
|
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
|
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
|
||||||
|
|
||||||
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
|
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
|
void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
|
||||||
|
|
||||||
SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
|
SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||||
if (tGroup == NULL) {
|
if (tGroup == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
taosArrayDestroy(sortSupport);
|
||||||
goto _error;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayPush(tGroup, info) == NULL) {
|
if (taosArrayPush(tGroup, info) == NULL) {
|
||||||
qError("taos push info array error");
|
qError("taos push info array error");
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
taosArrayDestroy(sortSupport);
|
||||||
goto _error;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (taosArrayPush(sortSupport, groupId) == NULL) {
|
if (taosArrayPush(sortSupport, groupId) == NULL) {
|
||||||
qError("taos push support array error");
|
qError("taos push support array error");
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
taosArrayDestroy(sortSupport);
|
||||||
goto _error;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
|
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
|
||||||
qError("taos push group array error");
|
qError("taos push group array error");
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
taosArrayDestroy(sortSupport);
|
||||||
goto _error;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
|
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
|
||||||
if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
|
if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
|
||||||
qError("taos insert support array error");
|
qError("taos insert support array error");
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
taosArrayDestroy(sortSupport);
|
||||||
goto _error;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
|
if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
|
||||||
qError("taos insert group array error");
|
qError("taos insert group array error");
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
taosArrayDestroy(sortSupport);
|
||||||
goto _error;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
|
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
|
||||||
if (taosArrayPush(tGroup, info) == NULL) {
|
if (taosArrayPush(tGroup, info) == NULL) {
|
||||||
qError("taos push uid array error");
|
qError("taos push uid array error");
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(sortSupport);
|
taosArrayDestroy(sortSupport);
|
||||||
#endif
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosArrayDestroy(sortSupport);
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_error:
|
|
||||||
// taosArrayDestroy(sortSupport);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool groupbyTbname(SNodeList* pGroupList) {
|
bool groupbyTbname(SNodeList* pGroupList) {
|
||||||
|
@ -3489,44 +3437,38 @@ bool groupbyTbname(SNodeList* pGroupList) {
|
||||||
return bytbname;
|
return bytbname;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
|
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
if (group == NULL) {
|
if (group == NULL) {
|
||||||
return code;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
if (pTableListInfo->map == NULL) {
|
if (pTableListInfo->map == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool assignUid = groupbyTbname(group);
|
bool assignUid = groupbyTbname(group);
|
||||||
|
|
||||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
|
||||||
if (assignUid) { // in case of group/partition by tbname, the group id is equalled to the uid of table
|
if (assignUid) {
|
||||||
for (int32_t i = 0; i < numOfTables; i++) {
|
for (int32_t i = 0; i < numOfTables; i++) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
info->groupId = info->uid;
|
info->groupId = info->uid;
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->oneTableForEachGroup = true;
|
|
||||||
if (groupSort) {
|
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (groupSort) {
|
|
||||||
code = sortTableGroup(pTableListInfo);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
if (pTableListInfo->needSortTableByGroupId) {
|
||||||
|
return sortTableGroup(pTableListInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
|
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
|
||||||
|
@ -3563,12 +3505,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
// NOTE: this is an patch to fix the physical plan
|
|
||||||
// TODO remove it later
|
|
||||||
if (pTableScanNode->scan.node.pLimit != NULL) {
|
|
||||||
pTableScanNode->groupSort = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
|
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
|
||||||
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||||
|
@ -3627,10 +3563,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
|
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
qDebug("creating stream task: add table uid:%" PRIu64, pKeyInfo->uid);
|
qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("table in hashmap, %d", (int32_t) getTotalTables(pTableListInfo));
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3665,20 +3599,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
SQueryTableDataCond cond = {0};
|
||||||
|
|
||||||
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
|
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t num = getTotalTables(pTableListInfo);
|
|
||||||
void* pList = NULL;
|
|
||||||
if (num > 0) {
|
|
||||||
pList = taosArrayGet(pTableListInfo->pTableList, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
STsdbReader* pReader = NULL;
|
STsdbReader* pReader = NULL;
|
||||||
tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, "");
|
tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
|
||||||
cleanupQueryTableDataCond(&cond);
|
cleanupQueryTableDataCond(&cond);
|
||||||
|
|
||||||
pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
|
pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
|
||||||
|
@ -3713,7 +3640,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||||
|
|
||||||
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
|
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||||
|
@ -4049,10 +3975,28 @@ _complete:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void doDestroyTableList(STableListInfo* pTableqinfoList) {
|
||||||
|
taosArrayDestroy(pTableqinfoList->pTableList);
|
||||||
|
taosHashCleanup(pTableqinfoList->map);
|
||||||
|
if (pTableqinfoList->needSortTableByGroupId) {
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
|
||||||
|
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
|
||||||
|
if (tmp == pTableqinfoList->pTableList) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
taosArrayDestroy(tmp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pTableqinfoList->pGroupList);
|
||||||
|
|
||||||
|
pTableqinfoList->pTableList = NULL;
|
||||||
|
pTableqinfoList->map = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
|
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
|
||||||
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
|
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
destroyTableList(&pTaskInfo->tableqinfoList);
|
doDestroyTableList(&pTaskInfo->tableqinfoList);
|
||||||
destroyOperatorInfo(pTaskInfo->pRoot);
|
destroyOperatorInfo(pTaskInfo->pRoot);
|
||||||
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
|
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
|
||||||
cleanupStreamInfo(&pTaskInfo->streamInfo);
|
cleanupStreamInfo(&pTaskInfo->streamInfo);
|
||||||
|
|
|
@ -159,7 +159,6 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
|
||||||
|
|
||||||
// reset the value for a new group data
|
// reset the value for a new group data
|
||||||
// existing rows that belongs to previous group.
|
// existing rows that belongs to previous group.
|
||||||
// TODO refactor with doTableScan
|
|
||||||
pLimitInfo->numOfOutputRows = 0;
|
pLimitInfo->numOfOutputRows = 0;
|
||||||
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
||||||
}
|
}
|
||||||
|
|
|
@ -377,7 +377,9 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
||||||
int32_t keep = pBlock->info.rows - overflowRows;
|
int32_t keep = pBlock->info.rows - overflowRows;
|
||||||
|
|
||||||
blockDataKeepFirstNRows(pBlock, keep);
|
blockDataKeepFirstNRows(pBlock, keep);
|
||||||
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
|
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
|
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -681,7 +683,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
|
qDebug(
|
||||||
|
"%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
|
||||||
|
"due to query func required",
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
// do prepare for the next round table scan operation
|
// do prepare for the next round table scan operation
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
@ -708,7 +713,8 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
|
|
||||||
qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
|
qDebug("%s start to repeat descending order scan data blocks due to query func required",
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -721,7 +727,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pInfo = pOperator->info;
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
// scan table one by one sequentially
|
// if scan table by table
|
||||||
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
||||||
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||||
|
|
||||||
|
@ -738,63 +744,54 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
||||||
tsdbSetTableList(pInfo->dataReader, pTableInfo, 1);
|
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
|
||||||
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
|
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
|
||||||
pInfo->currentTable, pTaskInfo->id.str);
|
pInfo->currentTable, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
}
|
}
|
||||||
} else { // scan table group by group sequentially
|
}
|
||||||
|
|
||||||
if (pInfo->currentGroupId == -1) {
|
if (pInfo->currentGroupId == -1) {
|
||||||
if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) {
|
pInfo->currentGroupId++;
|
||||||
doSetOperatorCompleted(pOperator);
|
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
||||||
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t num = 0;
|
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
||||||
STableKeyInfo* pList = NULL;
|
tsdbReaderClose(pInfo->dataReader);
|
||||||
getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num);
|
|
||||||
ASSERT(pInfo->dataReader == NULL);
|
|
||||||
|
|
||||||
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, (STsdbReader**)&pInfo->dataReader,
|
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||||
if (result != NULL) {
|
if (result) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) {
|
pInfo->currentGroupId++;
|
||||||
doSetOperatorCompleted(pOperator);
|
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
||||||
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset value for the next group data output
|
|
||||||
pOperator->status = OP_OPENED;
|
|
||||||
pInfo->limitInfo.numOfOutputRows = 0;
|
|
||||||
pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset;
|
|
||||||
|
|
||||||
int32_t num = 0;
|
|
||||||
STableKeyInfo* pList = NULL;
|
|
||||||
getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num);
|
|
||||||
|
|
||||||
tsdbSetTableList(pInfo->dataReader, pList, num);
|
|
||||||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
|
|
||||||
result = doTableScanGroup(pOperator);
|
result = doTableScanGroup(pOperator);
|
||||||
if (result != NULL) {
|
if (result) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
doSetOperatorCompleted(pOperator);
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||||
|
@ -840,6 +837,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
||||||
|
|
||||||
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -1079,55 +1077,39 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
|
||||||
pTableScanInfo->cond.twindows = *pWin;
|
pTableScanInfo->cond.twindows = *pWin;
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
pTableScanInfo->currentGroupId = -1;
|
pTableScanInfo->currentGroupId = -1;
|
||||||
tsdbReaderClose(pTableScanInfo->dataReader);
|
}
|
||||||
pTableScanInfo->dataReader = NULL;
|
|
||||||
|
static void freeArray(void* array) { taosArrayDestroy(array); }
|
||||||
|
|
||||||
|
static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
|
||||||
|
STableScanInfo* pTableScanInfo = pTableScanOp->info;
|
||||||
|
pTableScanInfo->cond.startVersion = -1;
|
||||||
|
pTableScanInfo->cond.endVersion = -1;
|
||||||
|
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
|
||||||
|
SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList;
|
||||||
|
taosArrayClearP(gpTbls, freeArray);
|
||||||
|
taosArrayPush(gpTbls, &allTbls);
|
||||||
|
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
|
resetTableScanInfo(pTableScanOp->info, &win);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
|
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
|
||||||
int64_t maxVersion) {
|
int64_t maxVersion) {
|
||||||
|
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
|
||||||
|
taosArrayClear(gpTbls);
|
||||||
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
|
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
|
||||||
|
SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
taosArrayPush(tbls, &tblInfo);
|
||||||
|
taosArrayPush(gpTbls, &tbls);
|
||||||
|
|
||||||
|
STimeWindow win = {.skey = startTs, .ekey = endTs};
|
||||||
STableScanInfo* pTableScanInfo = pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pTableScanOp->info;
|
||||||
SQueryTableDataCond cond = pTableScanInfo->cond;
|
pTableScanInfo->cond.startVersion = -1;
|
||||||
|
pTableScanInfo->cond.endVersion = maxVersion;
|
||||||
cond.startVersion = -1;
|
resetTableScanInfo(pTableScanOp->info, &win);
|
||||||
cond.endVersion = maxVersion;
|
SSDataBlock* pRes = doTableScan(pTableScanOp);
|
||||||
cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};
|
resetTableScanOperator(pTableScanOp);
|
||||||
|
return pRes;
|
||||||
SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
|
|
||||||
|
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
|
||||||
blockDataCleanup(pBlock);
|
|
||||||
|
|
||||||
STsdbReader* pReader = NULL;
|
|
||||||
int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader,
|
|
||||||
GET_TASKID(pTaskInfo));
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
terrno = code;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool hasBlock = tsdbNextDataBlock(pReader);
|
|
||||||
if (hasBlock) {
|
|
||||||
SDataBlockInfo binfo = {0};
|
|
||||||
tsdbRetrieveDataBlockInfo(pReader, &binfo);
|
|
||||||
|
|
||||||
SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
|
|
||||||
blockDataEnsureCapacity(pBlock, binfo.rows);
|
|
||||||
|
|
||||||
pBlock->info.window = binfo.window;
|
|
||||||
pBlock->info.uid = binfo.uid;
|
|
||||||
pBlock->info.rows = binfo.rows;
|
|
||||||
|
|
||||||
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
|
||||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbReaderClose(pReader);
|
|
||||||
qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
|
|
||||||
", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
|
|
||||||
|
|
||||||
return pBlock->info.rows > 0 ? pBlock : NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
||||||
|
@ -2353,14 +2335,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pTSInfo->cond.endVersion = pHandle->version;
|
pTSInfo->cond.endVersion = pHandle->version;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo* pList = NULL;
|
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
||||||
int32_t num = 0;
|
|
||||||
getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num);
|
|
||||||
|
|
||||||
if (pHandle->initTableReader) {
|
if (pHandle->initTableReader) {
|
||||||
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||||
pTSInfo->dataReader = NULL;
|
pTSInfo->dataReader = NULL;
|
||||||
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL) < 0) {
|
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -2388,7 +2367,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
// set the extract column id to streamHandle
|
// set the extract column id to streamHandle
|
||||||
tqReaderSetColIdList(pInfo->tqReader, pColIds);
|
tqReaderSetColIdList(pInfo->tqReader, pColIds);
|
||||||
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
|
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
|
||||||
code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
|
int32_t code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -4174,9 +4153,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
||||||
int32_t code =
|
int32_t code =
|
||||||
extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -4234,8 +4210,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->numOfOuputGroups = 1;
|
pTableListInfo->needSortTableByGroupId = groupSort;
|
||||||
code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags, groupSort);
|
code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4249,10 +4225,14 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
|
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
|
||||||
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
|
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
|
||||||
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
|
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
|
||||||
STableKeyInfo* pList = taosArrayGet(pTableListInfo->pTableList, i);
|
SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));
|
||||||
|
|
||||||
STsdbReader* pReader = NULL;
|
STsdbReader* pReader = NULL;
|
||||||
tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr);
|
tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr);
|
||||||
taosArrayPush(arrayReader, &pReader);
|
taosArrayPush(arrayReader, &pReader);
|
||||||
|
|
||||||
|
taosArrayDestroy(subTableList);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -654,7 +654,6 @@ _retry:
|
||||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
||||||
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
|
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->limitInfo.numOfOutputRows += p->info.rows;
|
pInfo->limitInfo.numOfOutputRows += p->info.rows;
|
||||||
pDataBlock->info.rows = p->info.rows;
|
pDataBlock->info.rows = p->info.rows;
|
||||||
pDataBlock->info.groupId = pInfo->groupId;
|
pDataBlock->info.groupId = pInfo->groupId;
|
||||||
|
|
|
@ -39,7 +39,6 @@ endi
|
||||||
$val = $totalNum - 1
|
$val = $totalNum - 1
|
||||||
sql select * from $stb limit $totalNum offset 1
|
sql select * from $stb limit $totalNum offset 1
|
||||||
if $rows != $val then
|
if $rows != $val then
|
||||||
print expect $val , actual: $rows
|
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data01 != 1 then
|
if $data01 != 1 then
|
||||||
|
@ -493,9 +492,3 @@ sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8),
|
||||||
if $rows != 6 then
|
if $rows != 6 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql select * from $stb partition by tbname limit 1
|
|
||||||
if $rows != 10 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
|
|
|
@ -722,7 +722,6 @@ sql select bottom(c1, 1) from $stb where ts >= $ts0 and ts <= $tsu limit 5 offse
|
||||||
if $rows != 0 then
|
if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql select bottom(c1, 5) from $stb where ts >= $ts0 and ts <= $tsu limit 3 offset 5
|
sql select bottom(c1, 5) from $stb where ts >= $ts0 and ts <= $tsu limit 3 offset 5
|
||||||
if $rows != 0 then
|
if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
|
|
Loading…
Reference in New Issue