fix failed test cases
This commit is contained in:
parent
98baaf51cd
commit
605e63200a
|
@ -39,7 +39,7 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
|
||||||
void tscProcessActivityTimer(void *handle, void *tmrId);
|
void tscProcessActivityTimer(void *handle, void *tmrId);
|
||||||
int tscKeepConn[TSDB_SQL_MAX] = {0};
|
int tscKeepConn[TSDB_SQL_MAX] = {0};
|
||||||
|
|
||||||
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
|
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
|
||||||
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
|
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
|
||||||
void tscSaveSubscriptionProgress(void* sub);
|
void tscSaveSubscriptionProgress(void* sub);
|
||||||
|
|
||||||
|
@ -575,6 +575,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
|
||||||
|
|
||||||
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
|
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
|
||||||
|
TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
|
||||||
|
|
||||||
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
||||||
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
|
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
|
||||||
|
@ -596,7 +597,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
||||||
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
||||||
pTableIdInfo->tid = htonl(pTableMeta->sid);
|
pTableIdInfo->tid = htonl(pTableMeta->sid);
|
||||||
pTableIdInfo->uid = htobe64(pTableMeta->uid);
|
pTableIdInfo->uid = htobe64(pTableMeta->uid);
|
||||||
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
|
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey));
|
||||||
|
|
||||||
pQueryMsg->numOfTables = htonl(1); // set the number of tables
|
pQueryMsg->numOfTables = htonl(1); // set the number of tables
|
||||||
|
|
||||||
|
@ -624,7 +625,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
||||||
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
||||||
pTableIdInfo->tid = htonl(pItem->tid);
|
pTableIdInfo->tid = htonl(pItem->tid);
|
||||||
pTableIdInfo->uid = htobe64(pItem->uid);
|
pTableIdInfo->uid = htobe64(pItem->uid);
|
||||||
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid));
|
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
|
||||||
pMsg += sizeof(STableIdInfo);
|
pMsg += sizeof(STableIdInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,16 +56,16 @@ static int tscCompareSubscriptionProgress(const void* a, const void* b) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) {
|
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt) {
|
||||||
if (sub == NULL) {
|
if (sub == NULL) {
|
||||||
return 0;
|
return dflt;
|
||||||
}
|
}
|
||||||
SSub* pSub = (SSub*)sub;
|
SSub* pSub = (SSub*)sub;
|
||||||
|
|
||||||
SSubscriptionProgress target = {.uid = uid, .key = 0};
|
SSubscriptionProgress target = {.uid = uid, .key = 0};
|
||||||
SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
|
SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return INT64_MIN;
|
return dflt;
|
||||||
}
|
}
|
||||||
return p->key;
|
return p->key;
|
||||||
}
|
}
|
||||||
|
@ -228,7 +228,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
|
||||||
for( size_t i = 0; i < numOfTables; i++ ) {
|
for( size_t i = 0; i < numOfTables; i++ ) {
|
||||||
STidTags* tt = taosArrayGet( tables, i );
|
STidTags* tt = taosArrayGet( tables, i );
|
||||||
SSubscriptionProgress p = { .uid = tt->uid };
|
SSubscriptionProgress p = { .uid = tt->uid };
|
||||||
p.key = tscGetSubscriptionProgress(pSub, tt->uid);
|
p.key = tscGetSubscriptionProgress(pSub, tt->uid, INT64_MIN);
|
||||||
taosArrayPush(progress, &p);
|
taosArrayPush(progress, &p);
|
||||||
}
|
}
|
||||||
taosArraySort(progress, tscCompareSubscriptionProgress);
|
taosArraySort(progress, tscCompareSubscriptionProgress);
|
||||||
|
|
|
@ -4162,6 +4162,42 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
||||||
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
|
if (onlyQueryTags(pQuery)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isSTableQuery && (!isIntervalQuery(pQuery)) && (!isFixedOutputQuery(pQuery))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STsdbQueryCond cond = {
|
||||||
|
.twindow = pQuery->window,
|
||||||
|
.order = pQuery->order.order,
|
||||||
|
.colList = pQuery->colList,
|
||||||
|
.numOfCols = pQuery->numOfCols,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!isSTableQuery
|
||||||
|
&& (pQInfo->groupInfo.numOfTables == 1)
|
||||||
|
&& (cond.order == TSDB_ORDER_ASC)
|
||||||
|
&& (!isIntervalQuery(pQuery))
|
||||||
|
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
|
||||||
|
&& (!isFixedOutputQuery(pQuery))
|
||||||
|
) {
|
||||||
|
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
|
||||||
|
SGroupItem* pItem = taosArrayGet(pa, 0);
|
||||||
|
cond.twindow = pItem->info->win;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) {
|
int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
|
|
||||||
|
@ -4170,30 +4206,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
|
||||||
|
|
||||||
setScanLimitationByResultBuffer(pQuery);
|
setScanLimitationByResultBuffer(pQuery);
|
||||||
changeExecuteScanOrder(pQuery, false);
|
changeExecuteScanOrder(pQuery, false);
|
||||||
|
setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||||
STsdbQueryCond cond = {
|
|
||||||
.twindow = pQuery->window,
|
|
||||||
.order = pQuery->order.order,
|
|
||||||
.colList = pQuery->colList,
|
|
||||||
.numOfCols = pQuery->numOfCols,
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
// normal query setup the queryhandle here
|
|
||||||
if (!onlyQueryTags(pQuery)) {
|
|
||||||
if (!isSTableQuery && isFirstLastRowQuery(pQuery)) { // in case of last_row query, invoke a different API.
|
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
|
||||||
} else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
|
|
||||||
|
|
||||||
if(pQInfo->groupInfo.numOfTables == 1) {
|
|
||||||
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
|
|
||||||
SGroupItem* pItem = taosArrayGet(pa, 0);
|
|
||||||
cond.twindow = pItem->info->win;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pQInfo->tsdb = tsdb;
|
pQInfo->tsdb = tsdb;
|
||||||
pQInfo->vgId = vgId;
|
pQInfo->vgId = vgId;
|
||||||
|
@ -5772,7 +5785,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
||||||
if (pTableId != NULL ) {
|
if (pTableId != NULL ) {
|
||||||
window.skey = pTableId->key;
|
window.skey = pTableId->key;
|
||||||
} else {
|
} else {
|
||||||
window.skey = INT64_MIN;
|
window.skey = pQueryMsg->window.skey;
|
||||||
}
|
}
|
||||||
item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window);
|
item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window);
|
||||||
item.info->groupIdx = i;
|
item.info->groupIdx = i;
|
||||||
|
|
|
@ -27,7 +27,7 @@ sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol bool)
|
||||||
$i = 0
|
$i = 0
|
||||||
while $i < 5
|
while $i < 5
|
||||||
$tb = $tbPrefix . $i
|
$tb = $tbPrefix . $i
|
||||||
sql create table $tb using $mt tags( 0 )
|
sql create table $tb using $mt tags( $i )
|
||||||
$x = 0
|
$x = 0
|
||||||
while $x < $rowNum
|
while $x < $rowNum
|
||||||
$val = $x * 60000
|
$val = $x * 60000
|
||||||
|
|
Loading…
Reference in New Issue