From a28d714d98dce759db3ed4a86198d3b9da04fa19 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Sep 2020 18:35:10 +0800 Subject: [PATCH] td-1423] fix bugs in group by nchar/binary columns. #3476 --- src/query/inc/qExecutor.h | 2 +- src/query/src/qExecutor.c | 43 +++++++++++++++++++++++++---------- src/query/src/qUtil.c | 47 +++++++++++++++++++++++++++++---------- 3 files changed, 67 insertions(+), 25 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 25fb04fb9a..169bf907c6 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -57,7 +57,7 @@ typedef struct SWindowResult { uint16_t numOfRows; // number of rows of current time window bool closed; // this result status: closed or opened SResultInfo* resultInfo; // For each result column, there is a resultInfo - TSKEY skey; // start key of current time window + union {STimeWindow win; char* key;}; // start key of current time window } SWindowResult; /** diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 511d6c36ef..f4927ecb78 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -518,7 +518,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t } else { int32_t slot = curTimeWindowIndex(pWindowResInfo); SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, slot); - w = GET_TIMEWINDOW(pWindowResInfo, pWindowRes); + w = pWindowRes->win; } if (w.skey > ts || w.ekey < ts) { @@ -624,7 +624,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes } // set time window for current result - pWindowRes->skey = win->skey; + pWindowRes->win = (*win); setWindowResOutputBufInitCtx(pRuntimeEnv, pWindowRes); return TSDB_CODE_SUCCESS; @@ -697,12 +697,12 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe continue; } - TSKEY ekey = pResult->skey + pWindowResInfo->interval; + TSKEY ekey = pResult->win.ekey; if ((ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || - (pResult->skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { + (pResult->win.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { closeTimeWindow(pWindowResInfo, i); } else { - skey = pResult->skey; + skey = pResult->win.skey; break; } } @@ -715,7 +715,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe pWindowResInfo->curIndex = i; } - pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].skey; + pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].win.skey; // the number of completed slots are larger than the threshold, return current generated results to client. if (numOfClosed > pWindowResInfo->threshold) { @@ -1097,8 +1097,25 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - int64_t v = -1; // not assign result buffer yet, add new result buffer + char* d = pData; + int16_t len = bytes; + if (type == TSDB_DATA_TYPE_BINARY||type == TSDB_DATA_TYPE_NCHAR) { + d = varDataVal(pData); + len = varDataLen(pData); + } else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + qError("QInfo:%p group by not supported on double/float/binary/nchar columns, abort", pQInfo); + + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + } + + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true); + if (pWindowRes == NULL) { + return -1; + } + + int64_t v = -1; switch(type) { case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: v = GET_INT8_VAL(pData); break; @@ -1107,12 +1124,14 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break; } - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes, true); - if (pWindowRes == NULL) { - return -1; + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + pWindowRes->key = malloc(varDataTLen(pData)); + varDataCopy(pWindowRes->key, pData); + } else { + pWindowRes->win.skey = v; + pWindowRes->win.ekey = v; } - pWindowRes->skey = v; assert(pRuntimeEnv->windowResInfo.interval == 0); if (pWindowRes->pos.pageId == -1) { @@ -3004,7 +3023,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page); TSKEY ts = GET_INT64_VAL(b); - assert(ts == pWindowRes->skey); + assert(ts == pWindowRes->win.skey); int64_t num = getNumOfResultWindowRes(pQuery, pWindowRes); if (num <= 0) { cs.position[pos] += 1; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 509362863c..c195a0b76c 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -126,11 +126,26 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); assert(num >= 0 && num <= numOfClosed); - + + int16_t type = pWindowResInfo->type; + + char *key = NULL; + int16_t bytes = -1; + for (int32_t i = 0; i < num; ++i) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; if (pResult->closed) { // remove the window slot from hash table - taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->skey, pWindowResInfo->type); + + // todo refactor + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + key = varDataVal(pResult->key); + bytes = varDataLen(pResult->key); + } else { + key = (char*) &pResult->win.skey; + bytes = tDataTypeDesc[pWindowResInfo->type].nSize; + } + + taosHashRemove(pWindowResInfo->hashList, (const char *)key, bytes); } else { break; } @@ -150,15 +165,24 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { } pWindowResInfo->size = remain; + for (int32_t k = 0; k < pWindowResInfo->size; ++k) { SWindowResult *pResult = &pWindowResInfo->pResult[k]; - int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->skey, - tDataTypeDesc[pWindowResInfo->type].nSize); + + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + key = varDataVal(pResult->key); + bytes = varDataLen(pResult->key); + } else { + key = (char*) &pResult->win.skey; + bytes = tDataTypeDesc[pWindowResInfo->type].nSize; + } + + int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)key, bytes); assert(p != NULL); + int32_t v = (*p - num); assert(v >= 0 && v <= pWindowResInfo->size); - taosHashPut(pWindowResInfo->hashList, (char *)&pResult->skey, tDataTypeDesc[pWindowResInfo->type].nSize, - (char *)&v, sizeof(int32_t)); + taosHashPut(pWindowResInfo->hashList, (char *)key, bytes, (char *)&v, sizeof(int32_t)); } pWindowResInfo->curIndex = -1; @@ -207,20 +231,19 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_ } // get the result order - int32_t resultOrder = (pWindowResInfo->pResult[0].skey < pWindowResInfo->pResult[1].skey)? 1:-1; - + int32_t resultOrder = (pWindowResInfo->pResult[0].win.skey < pWindowResInfo->pResult[1].win.skey)? 1:-1; if (order != resultOrder) { return; } int32_t i = 0; if (order == QUERY_ASC_FORWARD_STEP) { - TSKEY ekey = pWindowResInfo->pResult[i].skey + pWindowResInfo->interval; + TSKEY ekey = pWindowResInfo->pResult[i].win.ekey; while (i < pWindowResInfo->size && (ekey < lastKey)) { ++i; } } else if (order == QUERY_DESC_FORWARD_STEP) { - while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].skey > lastKey)) { + while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].win.skey > lastKey)) { ++i; } } @@ -258,7 +281,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow pWindowRes->numOfRows = 0; pWindowRes->pos = (SPosInfo){-1, -1}; pWindowRes->closed = false; - pWindowRes->skey = TSKEY_INITIAL_VAL; + pWindowRes->win = TSWINDOW_INITIALIZER; } /** @@ -268,7 +291,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow */ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) { dst->numOfRows = src->numOfRows; - dst->skey = src->skey; + dst->win = src->win; dst->closed = src->closed; int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput;