Merge pull request #4744 from taosdata/feature/query
[TD-2587]<fix>: fix bugs introduced by refactor.
This commit is contained in:
commit
ab2791ab11
|
@ -69,14 +69,17 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
|
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
|
||||||
if( sub == NULL)
|
if( sub == NULL) {
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SSub* pSub = (SSub*)sub;
|
SSub* pSub = (SSub*)sub;
|
||||||
|
|
||||||
SSubscriptionProgress target = {.uid = uid, .key = ts};
|
SSubscriptionProgress target = {.uid = uid, .key = ts};
|
||||||
SSubscriptionProgress* p = taosArraySearch(pSub->progress, &target, tscCompareSubscriptionProgress);
|
SSubscriptionProgress* p = taosArraySearch(pSub->progress, &target, tscCompareSubscriptionProgress);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
p->key = ts;
|
p->key = ts;
|
||||||
|
tscDebug("subscribe:%s, uid:%"PRIu64" update sub start ts:%"PRId64, pSub->topic, p->uid, p->key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -502,6 +505,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||||
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single tabel subscription
|
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single tabel subscription
|
||||||
pQueryInfo->window.skey = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, 0))->key;
|
pQueryInfo->window.skey = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, 0))->key;
|
||||||
|
tscDebug("subscribe:%s set subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSub->pTimer == NULL) {
|
if (pSub->pTimer == NULL) {
|
||||||
|
|
|
@ -1722,6 +1722,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
||||||
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
|
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update the lastkey of current table for projection/aggregation query
|
||||||
|
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
|
||||||
|
pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
|
||||||
// interval query with limit applied
|
// interval query with limit applied
|
||||||
int32_t numOfRes = 0;
|
int32_t numOfRes = 0;
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
|
||||||
|
@ -4299,7 +4303,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
||||||
*(int32_t*)data = htonl(numOfTables);
|
*(int32_t*)data = htonl(numOfTables);
|
||||||
data += sizeof(int32_t);
|
data += sizeof(int32_t);
|
||||||
|
|
||||||
|
int32_t total = 0;
|
||||||
STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL);
|
STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL);
|
||||||
|
|
||||||
while(item) {
|
while(item) {
|
||||||
STableIdInfo* pDst = (STableIdInfo*)data;
|
STableIdInfo* pDst = (STableIdInfo*)data;
|
||||||
pDst->uid = htobe64(item->uid);
|
pDst->uid = htobe64(item->uid);
|
||||||
|
@ -4307,9 +4313,14 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
||||||
pDst->key = htobe64(item->key);
|
pDst->key = htobe64(item->key);
|
||||||
|
|
||||||
data += sizeof(STableIdInfo);
|
data += sizeof(STableIdInfo);
|
||||||
|
total++;
|
||||||
|
|
||||||
|
qDebug("QInfo:%p set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo, item->tid, item->uid, item->key);
|
||||||
item = taosHashIterate(pQInfo->arrTableIdInfo, item);
|
item = taosHashIterate(pQInfo->arrTableIdInfo, item);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("QInfo:%p set %d subscribe info", pQInfo, total);
|
||||||
|
|
||||||
// Check if query is completed or not for stable query or normal table query respectively.
|
// Check if query is completed or not for stable query or normal table query respectively.
|
||||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||||
if (pQInfo->runtimeEnv.stableQuery) {
|
if (pQInfo->runtimeEnv.stableQuery) {
|
||||||
|
|
Loading…
Reference in New Issue