fix bug
This commit is contained in:
parent
624fb8b308
commit
c07d2ff9ee
|
@ -4341,6 +4341,65 @@ int32_t validateJoinNodes(SQueryInfo* pQueryInfo, SSqlObj* pSql) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void mergeJoinNodesImpl(int8_t* r, int8_t* p, int16_t* tidx, SJoinNode** nodes, int32_t type) {
|
||||
SJoinNode *node = nodes[*tidx];
|
||||
SArray* arr = (type == 0) ? node->tsJoin : node->tagJoin;
|
||||
int32_t size = taosArrayGetSize(arr);
|
||||
|
||||
p[*tidx] = 1;
|
||||
|
||||
for (int32_t j = 0; j < size; j++) {
|
||||
int16_t* idx = taosArrayGet(arr, j);
|
||||
r[*idx] = 1;
|
||||
if (p[*idx] == 0) {
|
||||
mergeJoinNodesImpl(r, p, idx, nodes, type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mergeJoinNodes(SQueryInfo* pQueryInfo) {
|
||||
int8_t r[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
int8_t p[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
|
||||
for (int16_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
mergeJoinNodesImpl(r, p, &i, pQueryInfo->tagCond.joinInfo.joinTables, 0);
|
||||
|
||||
taosArrayClear(pQueryInfo->tagCond.joinInfo.joinTables[i]->tsJoin);
|
||||
|
||||
for (int32_t j = 0; j < TSDB_MAX_JOIN_TABLE_NUM; ++j) {
|
||||
if (r[j]) {
|
||||
taosArrayPush(pQueryInfo->tagCond.joinInfo.joinTables[i]->tsJoin, &j);
|
||||
}
|
||||
}
|
||||
|
||||
memset(r, 0, sizeof(r));
|
||||
memset(p, 0, sizeof(p));
|
||||
}
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
for (int16_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
mergeJoinNodesImpl(r, p, &i, pQueryInfo->tagCond.joinInfo.joinTables, 1);
|
||||
|
||||
taosArrayClear(pQueryInfo->tagCond.joinInfo.joinTables[i]->tagJoin);
|
||||
|
||||
for (int32_t j = 0; j < TSDB_MAX_JOIN_TABLE_NUM; ++j) {
|
||||
if (r[j]) {
|
||||
taosArrayPush(pQueryInfo->tagCond.joinInfo.joinTables[i]->tagJoin, &j);
|
||||
}
|
||||
}
|
||||
|
||||
memset(r, 0, sizeof(r));
|
||||
memset(p, 0, sizeof(p));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql) {
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -4419,6 +4478,11 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql
|
|||
if (ret) {
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
ret = mergeJoinNodes(pQueryInfo);
|
||||
if (ret) {
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
}
|
||||
|
||||
PARSE_WHERE_EXIT:
|
||||
|
|
|
@ -118,156 +118,6 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
|
|||
|
||||
|
||||
|
||||
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
||||
|
||||
STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order);
|
||||
STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order);
|
||||
|
||||
win->skey = INT64_MAX;
|
||||
win->ekey = INT64_MIN;
|
||||
|
||||
SLimitVal* pLimit = &pQueryInfo->limit;
|
||||
int32_t order = pQueryInfo->order.order;
|
||||
|
||||
SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
|
||||
SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
|
||||
|
||||
pSubQueryInfo1->tsBuf = output1;
|
||||
pSubQueryInfo2->tsBuf = output2;
|
||||
|
||||
TSKEY st = taosGetTimestampUs();
|
||||
|
||||
// no result generated, return directly
|
||||
if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
|
||||
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsBufResetPos(pSupporter1->pTSBuf);
|
||||
tsBufResetPos(pSupporter2->pTSBuf);
|
||||
|
||||
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tscDebug("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t numOfInput1 = 1;
|
||||
int64_t numOfInput2 = 1;
|
||||
|
||||
while(1) {
|
||||
STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
|
||||
|
||||
// no data in pSupporter1 anymore, jump out of loop
|
||||
if (!tsBufIsValidElem(&elem)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// find the data in supporter2 with the same tag value
|
||||
STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
|
||||
|
||||
/**
|
||||
* there are elements in pSupporter2 with the same tag, continue
|
||||
*/
|
||||
tVariant tag1 = {0};
|
||||
tVariantAssign(&tag1, elem.tag);
|
||||
|
||||
if (tsBufIsValidElem(&e2)) {
|
||||
while (1) {
|
||||
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
|
||||
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
|
||||
|
||||
// data with current are exhausted
|
||||
if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
|
||||
skipRemainValue(pSupporter1->pTSBuf, &tag1);
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
|
||||
* final results which is acquired after the secondary merge of in the client.
|
||||
*/
|
||||
int32_t re = tsCompare(order, elem1.ts, elem2.ts);
|
||||
if (re < 0) {
|
||||
tsBufNextPos(pSupporter1->pTSBuf);
|
||||
numOfInput1++;
|
||||
} else if (re > 0) {
|
||||
tsBufNextPos(pSupporter2->pTSBuf);
|
||||
numOfInput2++;
|
||||
} else {
|
||||
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
|
||||
if (win->skey > elem1.ts) {
|
||||
win->skey = elem1.ts;
|
||||
}
|
||||
|
||||
if (win->ekey < elem1.ts) {
|
||||
win->ekey = elem1.ts;
|
||||
}
|
||||
|
||||
tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
|
||||
tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
|
||||
} else {
|
||||
pLimit->offset -= 1;//offset apply to projection?
|
||||
}
|
||||
|
||||
tsBufNextPos(pSupporter1->pTSBuf);
|
||||
numOfInput1++;
|
||||
|
||||
tsBufNextPos(pSupporter2->pTSBuf);
|
||||
numOfInput2++;
|
||||
}
|
||||
}
|
||||
} else { // no data in pSupporter2, ignore current data in pSupporter2
|
||||
skipRemainValue(pSupporter1->pTSBuf, &tag1);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* failed to set the correct ts order yet in two cases:
|
||||
* 1. only one element
|
||||
* 2. only one element for each tag.
|
||||
*/
|
||||
if (output1->tsOrder == -1) {
|
||||
output1->tsOrder = TSDB_ORDER_ASC;
|
||||
output2->tsOrder = TSDB_ORDER_ASC;
|
||||
}
|
||||
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tsBufDestroy(pSupporter1->pTSBuf);
|
||||
pSupporter1->pTSBuf = NULL;
|
||||
tsBufDestroy(pSupporter2->pTSBuf);
|
||||
pSupporter2->pTSBuf = NULL;
|
||||
|
||||
TSKEY et = taosGetTimestampUs();
|
||||
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
|
||||
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
|
||||
tsBufGetNumOfGroup(output1), et - st);
|
||||
|
||||
return output1->numOfTotal;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
||||
|
||||
|
@ -279,7 +129,19 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
|
|||
int32_t joinNum = pSql->subState.numOfSub;
|
||||
SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
|
||||
int32_t slot = 0;
|
||||
int32_t tableNum = 0;
|
||||
int16_t* tableMIdx = 0;
|
||||
int32_t equalNum = 0;
|
||||
int32_t stackidx = 0;
|
||||
SMergeTsCtx* ctx = NULL;
|
||||
SMergeTsCtx* pctx = NULL;
|
||||
SMergeTsCtx* mainCtx = NULL;
|
||||
STSElem cur;
|
||||
STSElem prev;
|
||||
SArray* tsCond = NULL;
|
||||
int32_t mergeDone = 0;
|
||||
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
STSBuf* output = tsBufCreate(true, pQueryInfo->order.order);
|
||||
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
|
||||
|
@ -288,14 +150,14 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
|
|||
|
||||
SJoinSupporter* pSupporter = pSql->pSubs[i]->param;
|
||||
|
||||
if (pSupporter[i]->pTSBuf == NULL) {
|
||||
if (pSupporter->pTSBuf == NULL) {
|
||||
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsBufResetPos(pSupporter[i]->pTSBuf);
|
||||
tsBufResetPos(pSupporter->pTSBuf);
|
||||
|
||||
if (!tsBufNextPos(pSupporter[i]->pTSBuf)) {
|
||||
if (!tsBufNextPos(pSupporter->pTSBuf)) {
|
||||
tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
@ -306,215 +168,181 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
|
|||
|
||||
TSKEY st = taosGetTimestampUs();
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
int32_t slot = 0;
|
||||
int32_t tableNum = 0;
|
||||
int16_t* tableMIdx = 0;
|
||||
int32_t equalNum = 0;
|
||||
int32_t stackidx = 0;
|
||||
int32_t mergeDone = 0;
|
||||
SMergeCtx* ctx = NULL;
|
||||
SMergeCtx* pctx = NULL;
|
||||
STidTags* cur = NULL;
|
||||
STidTags* prev = NULL;
|
||||
SArray* tagCond = NULL;
|
||||
|
||||
for (int16_t tidx = 0; tidx < joinNum; tidx++) {
|
||||
pctx = &ctxlist[tidx];
|
||||
if (pctx->compared) {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(pctx->idx == 0 && taosArrayGetSize(pctx->res) == 0);
|
||||
assert(pctx->numOfInput == 0);
|
||||
|
||||
tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin;
|
||||
tsCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tsJoin;
|
||||
|
||||
taosArrayInsert(tagCond, 0, &tidx);
|
||||
tableNum = taosArrayGetSize(tsCond);
|
||||
assert(tableNum >= 2);
|
||||
|
||||
tableNum = taosArrayGetSize(tagCond);
|
||||
assert(tableNum >= 1);
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
pctx->compared = 1;
|
||||
for (int32_t i = 0; i < tableNum; ++i) {
|
||||
tableMIdx = taosArrayGet(tsCond, i);
|
||||
SMergeTsCtx* tctx = &ctxlist[*tableMIdx];
|
||||
tctx->compared = 1;
|
||||
}
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
tableMIdx = taosArrayGet(tsCond, 0);
|
||||
pctx = &ctxlist[*tableMIdx];
|
||||
|
||||
tableMIdx = taosArrayGet(tagCond, ++slot);
|
||||
mainCtx = pctx;
|
||||
|
||||
equalNum = 1;
|
||||
|
||||
while (1) {
|
||||
ctx = &ctxlist[*tableMIdx];
|
||||
while (1) {
|
||||
pctx = mainCtx;
|
||||
|
||||
ctx->compared = 1;
|
||||
|
||||
cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize);
|
||||
prev = tsBufGetElem(pctx->p->pTSBuf);
|
||||
|
||||
assert(cur->tid != 0 && prev->tid != 0);
|
||||
ctxStack[stackidx++] = pctx;
|
||||
|
||||
ctxStack[stackidx++] = ctx;
|
||||
|
||||
int32_t ret = doCompare(prev->tag, cur->tag, pColSchema->type, pColSchema->bytes);
|
||||
if (ret == 0) {
|
||||
if (++equalNum < tableNum) {
|
||||
prev = cur;
|
||||
pctx = ctx;
|
||||
|
||||
if (++slot >= tableNum) {
|
||||
slot = 0;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tagCond, slot);
|
||||
continue;
|
||||
}
|
||||
|
||||
tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, prev->vgId,
|
||||
*(int*) prev->tag, prev->tid, prev->uid, cur->tid, cur->uid);
|
||||
|
||||
assert(stackidx == tableNum);
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize);
|
||||
|
||||
taosArrayPush(tctx->res, &prev);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
|
||||
if (++tctx->idx >= tctx->p->num) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
} else if (ret > 0) {
|
||||
if (++ctx->idx >= ctx->p->num) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
if (++tctx->idx >= tctx->p->num) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
ctxStack[stackidx++] = pctx;
|
||||
if (!tsBufIsValidElem(&prev)) {
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
tVariant tag = {0};
|
||||
tVariantAssign(&tag, prev.tag);
|
||||
|
||||
slot = 0;
|
||||
mergeDone = 0;
|
||||
}
|
||||
int32_t skipped = 0;
|
||||
|
||||
for (int32_t i = 1; i < tableNum; ++i) {
|
||||
SMergeTsCtx* tctx = &ctxlist[i];
|
||||
|
||||
// find the data in supporter2 with the same tag value
|
||||
STSElem e2 = tsBufFindElemStartPosByTag(tctx->p->pTSBuf, &tag);
|
||||
|
||||
if (!tsBufIsValidElem(&e2)) {
|
||||
skipRemainValue(pctx->p->pTSBuf, &tag);
|
||||
skipped = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (skipped) {
|
||||
slot = 0;
|
||||
stackidx = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tsCond, ++slot);
|
||||
equalNum = 1;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
int64_t numOfInput1 = 1;
|
||||
int64_t numOfInput2 = 1;
|
||||
|
||||
while(1) {
|
||||
STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
|
||||
|
||||
// no data in pSupporter1 anymore, jump out of loop
|
||||
if (!tsBufIsValidElem(&elem)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// find the data in supporter2 with the same tag value
|
||||
STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
|
||||
|
||||
/**
|
||||
* there are elements in pSupporter2 with the same tag, continue
|
||||
*/
|
||||
tVariant tag1 = {0};
|
||||
tVariantAssign(&tag1, elem.tag);
|
||||
|
||||
if (tsBufIsValidElem(&e2)) {
|
||||
while (1) {
|
||||
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
|
||||
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
|
||||
ctx = &ctxlist[*tableMIdx];
|
||||
|
||||
prev = tsBufGetElem(pctx->p->pTSBuf);
|
||||
cur = tsBufGetElem(ctx->p->pTSBuf);
|
||||
|
||||
// data with current are exhausted
|
||||
if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
|
||||
if (!tsBufIsValidElem(&prev) || tVariantCompare(prev.tag, &tag) != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
|
||||
skipRemainValue(pSupporter1->pTSBuf, &tag1);
|
||||
if (!tsBufIsValidElem(&cur) || tVariantCompare(cur.tag, &tag) != 0) { // ignore all records with the same tag
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
|
||||
* final results which is acquired after the secondary merge of in the client.
|
||||
*/
|
||||
int32_t re = tsCompare(order, elem1.ts, elem2.ts);
|
||||
if (re < 0) {
|
||||
tsBufNextPos(pSupporter1->pTSBuf);
|
||||
numOfInput1++;
|
||||
} else if (re > 0) {
|
||||
tsBufNextPos(pSupporter2->pTSBuf);
|
||||
numOfInput2++;
|
||||
} else {
|
||||
ctxStack[stackidx++] = ctx;
|
||||
|
||||
int32_t ret = tsCompare(order, prev.ts, cur.ts);
|
||||
if (ret == 0) {
|
||||
if (++equalNum < tableNum) {
|
||||
pctx = ctx;
|
||||
|
||||
if (++slot >= tableNum) {
|
||||
slot = 0;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tsCond, slot);
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(stackidx == tableNum);
|
||||
|
||||
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
|
||||
if (win->skey > elem1.ts) {
|
||||
win->skey = elem1.ts;
|
||||
if (win->skey > prev.ts) {
|
||||
win->skey = prev.ts;
|
||||
}
|
||||
|
||||
if (win->ekey < prev.ts) {
|
||||
win->ekey = prev.ts;
|
||||
}
|
||||
|
||||
if (win->ekey < elem1.ts) {
|
||||
win->ekey = elem1.ts;
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeTsCtx* tctx = ctxStack[i];
|
||||
prev = tsBufGetElem(tctx->p->pTSBuf);
|
||||
|
||||
tsBufAppend(tctx->res, prev.id, prev.tag, (const char*)&prev.ts, sizeof(prev.ts));
|
||||
}
|
||||
|
||||
tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
|
||||
tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
|
||||
} else {
|
||||
pLimit->offset -= 1;//offset apply to projection?
|
||||
}
|
||||
|
||||
tsBufNextPos(pSupporter1->pTSBuf);
|
||||
numOfInput1++;
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeTsCtx* tctx = ctxStack[i];
|
||||
|
||||
if (!tsBufNextPos(tctx->p->pTSBuf)) {
|
||||
mergeDone = 1;
|
||||
}
|
||||
tctx->numOfInput++;
|
||||
}
|
||||
|
||||
tsBufNextPos(pSupporter2->pTSBuf);
|
||||
numOfInput2++;
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
} else if (ret > 0) {
|
||||
if (!tsBufNextPos(ctx->p->pTSBuf)) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
ctx->numOfInput++;
|
||||
stackidx--;
|
||||
} else {
|
||||
stackidx--;
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeTsCtx* tctx = ctxStack[i];
|
||||
|
||||
if (!tsBufNextPos(tctx->p->pTSBuf)) {
|
||||
mergeDone = 1;
|
||||
}
|
||||
tctx->numOfInput++;
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
}
|
||||
|
||||
}
|
||||
} else { // no data in pSupporter2, ignore current data in pSupporter2
|
||||
skipRemainValue(pSupporter1->pTSBuf, &tag1);
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
slot = 0;
|
||||
stackidx = 0;
|
||||
|
||||
skipRemainValue(mainCtx->p->pTSBuf, &tag);
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
slot = 0;
|
||||
mergeDone = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -522,26 +350,31 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
|
|||
* 1. only one element
|
||||
* 2. only one element for each tag.
|
||||
*/
|
||||
if (output1->tsOrder == -1) {
|
||||
output1->tsOrder = TSDB_ORDER_ASC;
|
||||
output2->tsOrder = TSDB_ORDER_ASC;
|
||||
if (ctxlist[0].res->tsOrder == -1) {
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
ctxlist[i].res->tsOrder = TSDB_ORDER_ASC;
|
||||
}
|
||||
}
|
||||
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tsBufDestroy(pSupporter1->pTSBuf);
|
||||
pSupporter1->pTSBuf = NULL;
|
||||
tsBufDestroy(pSupporter2->pTSBuf);
|
||||
pSupporter2->pTSBuf = NULL;
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
tsBufFlush(ctxlist[i].res);
|
||||
|
||||
tsBufDestroy(ctxlist[i].p->pTSBuf);
|
||||
ctxlist[i].p->pTSBuf = NULL;
|
||||
}
|
||||
|
||||
TSKEY et = taosGetTimestampUs();
|
||||
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
|
||||
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
|
||||
tsBufGetNumOfGroup(output1), et - st);
|
||||
|
||||
return output1->numOfTotal;
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
tsBufFlush(ctxlist[i].res);
|
||||
|
||||
tscDebug("%p tblidx:%d, input:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
|
||||
pSql, i, ctxlist[i].numOfInput, ctxlist[i].res->numOfTotal, ctxlist[i].res->numOfGroups, win->skey, win->ekey,
|
||||
tsBufGetNumOfGroup(ctxlist[i].res), et - st);
|
||||
}
|
||||
|
||||
return ctxlist[0].res->numOfTotal;
|
||||
}
|
||||
|
||||
|
||||
|
@ -933,7 +766,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
|
|||
|
||||
size_t numOfTables = taosArrayGetSize(tables);
|
||||
for (size_t i = 0; i < numOfTables; i++) {
|
||||
STidTags* tt = *(STidTags **)taosArrayGet(tables, i);
|
||||
STidTags* tt = taosArrayGet(tables, i);
|
||||
|
||||
if (prev == NULL || tt->vgId != prev->vgId) {
|
||||
SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
|
||||
|
@ -1069,7 +902,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
|
|||
SJoinSupporter* p = pParentSql->pSubs[i]->param;
|
||||
|
||||
ctxlist[i].p = p;
|
||||
ctxlist[i].res = taosArrayInit(p->num, sizeof(STidTags*));
|
||||
ctxlist[i].res = taosArrayInit(p->num, size);
|
||||
|
||||
tscDebug("Join %d - num:%d", i, p->num);
|
||||
|
||||
|
@ -1106,13 +939,20 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
|
|||
|
||||
tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin;
|
||||
|
||||
taosArrayInsert(tagCond, 0, &tidx);
|
||||
|
||||
tableNum = taosArrayGetSize(tagCond);
|
||||
assert(tableNum >= 1);
|
||||
assert(tableNum >= 2);
|
||||
|
||||
for (int32_t i = 0; i < tableNum; ++i) {
|
||||
tableMIdx = taosArrayGet(tagCond, i);
|
||||
SMergeCtx* tctx = &ctxlist[*tableMIdx];
|
||||
tctx->compared = 1;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tagCond, slot);
|
||||
|
||||
pctx = &ctxlist[*tableMIdx];
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
pctx->compared = 1;
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
|
||||
|
@ -1122,8 +962,6 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
|
|||
|
||||
while (1) {
|
||||
ctx = &ctxlist[*tableMIdx];
|
||||
|
||||
ctx->compared = 1;
|
||||
|
||||
cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize);
|
||||
|
||||
|
@ -1154,7 +992,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
|
|||
SMergeCtx* tctx = ctxStack[i];
|
||||
prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize);
|
||||
|
||||
taosArrayPush(tctx->res, &prev);
|
||||
taosArrayPush(tctx->res, prev);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
|
@ -1177,10 +1015,14 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
|
|||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
} else if (ret > 0) {
|
||||
stackidx--;
|
||||
|
||||
if (++ctx->idx >= ctx->p->num) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
stackidx--;
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
if (++tctx->idx >= tctx->p->num) {
|
||||
|
@ -1204,6 +1046,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
|
|||
|
||||
slot = 0;
|
||||
mergeDone = 0;
|
||||
stackidx = 0;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
|
|
|
@ -1292,6 +1292,7 @@ bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex) {
|
|||
while (i < numOfCols) {
|
||||
SColumn* pCol = taosArrayGetP(pColumnList, i);
|
||||
if ((pCol->colIndex.columnIndex != col) || (pCol->colIndex.tableIndex != pColIndex->tableIndex)) {
|
||||
++i;
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue