From f3c92ad39b3151a6909e0f463eab34d8a9b7efff Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Aug 2020 16:40:41 +0800 Subject: [PATCH 1/8] [td-1206] --- src/client/src/tscFunctionImpl.c | 18 +- src/query/inc/qPercentile.h | 64 +- src/query/src/qPercentile.c | 1219 ++++++++++++------------------ 3 files changed, 521 insertions(+), 780 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index bd4aa70ee4..7f6ce1ed0e 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2422,24 +2422,14 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { /////////////////////////////////////////////////////////////////////////////////////////////// static bool percentile_function_setup(SQLFunctionCtx *pCtx) { - const int32_t MAX_AVAILABLE_BUFFER_SIZE = 1 << 20; // 1MB - const int32_t NUMOFCOLS = 1; - if (!function_setup(pCtx)) { return false; } SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SSchema field[1] = { { (uint8_t)pCtx->inputType, "dummyCol", 0, pCtx->inputBytes } }; - - SColumnModel *pModel = createColumnModel(field, 1, 1000); - int32_t orderIdx = 0; - - // tOrderDesc object - tOrderDescriptor *pDesc = tOrderDesCreate(&orderIdx, NUMOFCOLS, pModel, TSDB_ORDER_DESC); - + ((SPercentileInfo *)(pResInfo->interResultBuf))->pMemBucket = - tMemBucketCreate(1024, MAX_AVAILABLE_BUFFER_SIZE, pCtx->inputBytes, pCtx->inputType, pDesc); + tMemBucketCreate(pCtx->inputBytes, pCtx->inputType); return true; } @@ -2485,15 +2475,13 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { SResultInfo *pResInfo = GET_RES_INFO(pCtx); tMemBucket * pMemBucket = ((SPercentileInfo *)pResInfo->interResultBuf)->pMemBucket; - if (pMemBucket->numOfElems > 0) { // check for null + if (pMemBucket->total > 0) { // check for null *(double *)pCtx->aOutputBuf = getPercentile(pMemBucket, v); } else { setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); } - tOrderDescDestroy(pMemBucket->pOrderDesc); tMemBucketDestroy(pMemBucket); - doFinalizer(pCtx); } diff --git a/src/query/inc/qPercentile.h b/src/query/inc/qPercentile.h index 52f666c338..0a52d4f205 100644 --- a/src/query/inc/qPercentile.h +++ b/src/query/inc/qPercentile.h @@ -17,6 +17,8 @@ #define TDENGINE_QPERCENTILE_H #include "qExtbuffer.h" +#include "qResultbuf.h" +#include "qTsbuf.h" typedef struct MinMaxEntry { union { @@ -31,47 +33,43 @@ typedef struct MinMaxEntry { }; } MinMaxEntry; -typedef struct tMemBucketSegment { - int32_t numOfSlots; - MinMaxEntry * pBoundingEntries; - tExtMemBuffer **pBuffer; -} tMemBucketSegment; +typedef struct { + int32_t size; + int32_t pageId; + tFilePage *data; +} SSlotInfo; + +typedef struct tMemBucketSlot { + SSlotInfo info; + MinMaxEntry range; +} tMemBucketSlot; + +struct tMemBucket; +typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value); typedef struct tMemBucket { - int16_t numOfSegs; - int16_t nTotalSlots; - int16_t nSlotsOfSeg; - int16_t dataType; - - int16_t nElemSize; - int32_t numOfElems; - - int32_t nTotalBufferSize; - int32_t maxElemsCapacity; - - int32_t pageSize; - int16_t numOfTotalPages; - int16_t numOfAvailPages; /* remain available buffer pages */ - - tMemBucketSegment *pSegs; - tOrderDescriptor * pOrderDesc; - - MinMaxEntry nRange; - - void (*HashFunc)(struct tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx); + int16_t numOfSlots; + int16_t type; + int16_t bytes; + int32_t total; + int32_t elemPerPage; // number of elements for each object + int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result + int32_t bufPageSize; // disk page size + MinMaxEntry range; // value range + int32_t times; // count that has been checked for deciding the correct data value buckets. + __compar_fn_t comparFn; + + tMemBucketSlot *pSlots; + SDiskbasedResultBuf *pBuffer; + __perc_hash_func_t hashFunc; } tMemBucket; -tMemBucket *tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize, int16_t dataType, - tOrderDescriptor *pDesc); +tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType); void tMemBucketDestroy(tMemBucket *pBucket); -void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows); +void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size); double getPercentile(tMemBucket *pMemBucket, double percent); -void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx); - -void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx); - #endif // TDENGINE_QPERCENTILE_H diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index 19775075fc..c6eb836c61 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -14,310 +14,291 @@ */ #include "qPercentile.h" +#include "qResultbuf.h" #include "os.h" #include "queryLog.h" #include "taosdef.h" -#include "taosmsg.h" #include "tulog.h" +#include "tcompare.h" -tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) { - tExtMemBuffer *pBuffer = NULL; - - for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - if (i == segIdx && j == slotIdx) { - pBuffer = pSeg->pBuffer[j]; - } else { - if (pSeg->pBuffer && pSeg->pBuffer[j]) { - pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]); - } - } - } - } - - return pBuffer; +#define DEFAULT_NUM_OF_SLOT 1024 + +int32_t getGroupId(int32_t numOfSlots, int32_t slotIndex, int32_t times) { + return (times * numOfSlots) + slotIndex; } -static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx, - tOrderDescriptor *pDesc) { - // release all data in other slots - tExtMemBuffer *pMemBuffer = pMemBucket->pSegs[segIdx].pBuffer[slotIdx]; - tFilePage * buffer = (tFilePage *)calloc(1, pMemBuffer->nElemSize * pMemBuffer->numOfTotalElems + sizeof(tFilePage)); - int32_t oldCapacity = pDesc->pColumnModel->capacity; - pDesc->pColumnModel->capacity = pMemBuffer->numOfTotalElems; - - if (!tExtMemBufferIsAllDataInMem(pMemBuffer)) { - pMemBuffer = releaseBucketsExceptFor(pMemBucket, segIdx, slotIdx); - assert(pMemBuffer->numOfTotalElems > 0); - - // load data in disk to memory - tFilePage *pPage = (tFilePage *)calloc(1, pMemBuffer->pageSize); - - for (uint32_t i = 0; i < pMemBuffer->fileMeta.flushoutData.nLength; ++i) { - tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[i]; - - int32_t ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); - UNUSED(ret); - - for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) { - ret = (int32_t)fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); - UNUSED(ret); - assert(pPage->num > 0); - - tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, (int32_t)pPage->num, (int32_t)pPage->num); - printf("id: %d count: %" PRIu64 "\n", j, buffer->num); - } - } - taosTFree(pPage); - - assert(buffer->num == pMemBuffer->fileMeta.numOfElemsInFile); +static tFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) { + tFilePage *buffer = (tFilePage *)calloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(tFilePage)); + + int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times); + SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + + int32_t offset = 0; + for(int32_t i = 0; i < list->size; ++i) { + SPageInfo* pgInfo = *(SPageInfo**) taosArrayGet(list, i); + + tFilePage* pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); + memcpy(buffer->data + offset, pg->data, pg->num * pMemBucket->bytes); + + offset += pg->num * pMemBucket->bytes; } - - // load data in pMemBuffer to buffer - tFilePagesItem *pListItem = pMemBuffer->pHead; - while (pListItem != NULL) { - tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, (int32_t)pListItem->item.num, - (int32_t)pListItem->item.num); - pListItem = pListItem->pNext; - } - - tColDataQSort(pDesc, (int32_t)buffer->num, 0, (int32_t)buffer->num - 1, buffer->data, TSDB_ORDER_ASC); - - pDesc->pColumnModel->capacity = oldCapacity; // restore value + + qsort(buffer->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn); return buffer; } -double findOnlyResult(tMemBucket *pMemBucket) { - assert(pMemBucket->numOfElems == 1); - - for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - if (pSeg->pBuffer) { - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - tExtMemBuffer *pBuffer = pSeg->pBuffer[j]; - if (pBuffer) { - assert(pBuffer->numOfTotalElems == 1); - tFilePage *pPage = &pBuffer->pHead->item; - if (pBuffer->numOfElemsInBuffer == 1) { - switch (pMemBucket->dataType) { - case TSDB_DATA_TYPE_INT: - return *(int32_t *)pPage->data; - case TSDB_DATA_TYPE_SMALLINT: - return *(int16_t *)pPage->data; - case TSDB_DATA_TYPE_TINYINT: - return *(int8_t *)pPage->data; - case TSDB_DATA_TYPE_BIGINT: - return (double)(*(int64_t *)pPage->data); - case TSDB_DATA_TYPE_DOUBLE: { - double dv = GET_DOUBLE_VAL(pPage->data); - //return *(double *)pPage->data; - return dv; - } - case TSDB_DATA_TYPE_FLOAT: { - float fv = GET_FLOAT_VAL(pPage->data); - //return *(float *)pPage->data; - return fv; - } - default: - return 0; - } - } - } - } - } - } - return 0; -} - -void tBucketBigIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { - int64_t v = *(int64_t *)value; - - if (pBucket->nRange.i64MaxVal == INT64_MIN) { - if (v >= 0) { - *segIdx = ((v >> (64 - 9)) >> 6) + 8; - *slotIdx = (v >> (64 - 9)) & 0x3F; - } else { // v<0 - *segIdx = ((-v) >> (64 - 9)) >> 6; - *slotIdx = ((-v) >> (64 - 9)) & 0x3F; - *segIdx = 7 - (*segIdx); - } - } else { - // todo hash for bigint and float and double - int64_t span = pBucket->nRange.i64MaxVal - pBucket->nRange.i64MinVal; - if (span < pBucket->nTotalSlots) { - int32_t delta = (int32_t)(v - pBucket->nRange.i64MinVal); - *segIdx = delta / pBucket->nSlotsOfSeg; - *slotIdx = delta % pBucket->nSlotsOfSeg; - } else { - double x = (double)span / pBucket->nTotalSlots; - double posx = (v - pBucket->nRange.i64MinVal) / x; - if (v == pBucket->nRange.i64MaxVal) { - posx -= 1; - } - - *segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; - *slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg; - } - } -} - -// todo refactor to more generic -void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { - int32_t v = *(int32_t *)value; - - if (pBucket->nRange.iMaxVal == INT32_MIN) { - /* - * taking negative integer into consideration, - * there is only half of pBucket->segs available for non-negative integer - */ - // int32_t numOfSlots = pBucket->nTotalSlots>>1; - // int32_t bits = bitsOfNumber(numOfSlots)-1; - - if (v >= 0) { - *segIdx = ((v >> (32 - 9)) >> 6) + 8; - *slotIdx = (v >> (32 - 9)) & 0x3F; - } else { // v<0 - *segIdx = ((-v) >> (32 - 9)) >> 6; - *slotIdx = ((-v) >> (32 - 9)) & 0x3F; - *segIdx = 7 - (*segIdx); - } - } else { - // divide a range of [iMinVal, iMaxVal] into 1024 buckets - int32_t span = pBucket->nRange.iMaxVal - pBucket->nRange.iMinVal; - if (span < pBucket->nTotalSlots) { - int32_t delta = v - pBucket->nRange.iMinVal; - *segIdx = delta / pBucket->nSlotsOfSeg; - *slotIdx = delta % pBucket->nSlotsOfSeg; - } else { - double x = (double)span / pBucket->nTotalSlots; - double posx = (v - pBucket->nRange.iMinVal) / x; - if (v == pBucket->nRange.iMaxVal) { - posx -= 1; - } - *segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; - *slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg; - } - } -} - -void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { - // double v = *(double *)value; - double v = GET_DOUBLE_VAL(value); - - if (pBucket->nRange.dMinVal == DBL_MAX) { - /* - * taking negative integer into consideration, - * there is only half of pBucket->segs available for non-negative integer - */ - double x = DBL_MAX / (pBucket->nTotalSlots >> 1); - double posx = (v + DBL_MAX) / x; - *segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; - *slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg; - } else { - // divide a range of [dMinVal, dMaxVal] into 1024 buckets - double span = pBucket->nRange.dMaxVal - pBucket->nRange.dMinVal; - if (span < pBucket->nTotalSlots) { - int32_t delta = (int32_t)(v - pBucket->nRange.dMinVal); - *segIdx = delta / pBucket->nSlotsOfSeg; - *slotIdx = delta % pBucket->nSlotsOfSeg; - } else { - double x = span / pBucket->nTotalSlots; - double posx = (v - pBucket->nRange.dMinVal) / x; - if (v == pBucket->nRange.dMaxVal) { - posx -= 1; - } - *segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; - *slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg; - } - - if (*segIdx < 0 || *segIdx > 16 || *slotIdx < 0 || *slotIdx > 64) { - uError("error in hash process. segment is: %d, slot id is: %d\n", *segIdx, *slotIdx); - } - } -} - -tMemBucket *tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize, int16_t dataType, - tOrderDescriptor *pDesc) { - tMemBucket *pBucket = (tMemBucket *)malloc(sizeof(tMemBucket)); - pBucket->nTotalSlots = totalSlots; - pBucket->nSlotsOfSeg = 1 << 6; // 64 Segments, 16 slots each seg. - pBucket->dataType = dataType; - pBucket->nElemSize = nElemSize; - pBucket->pageSize = DEFAULT_PAGE_SIZE; - - pBucket->numOfElems = 0; - pBucket->numOfSegs = pBucket->nTotalSlots / pBucket->nSlotsOfSeg; - - pBucket->nTotalBufferSize = nBufferSize; - - pBucket->maxElemsCapacity = pBucket->nTotalBufferSize / pBucket->nElemSize; - - pBucket->numOfTotalPages = pBucket->nTotalBufferSize / pBucket->pageSize; - pBucket->numOfAvailPages = pBucket->numOfTotalPages; - - pBucket->pSegs = NULL; - pBucket->pOrderDesc = pDesc; - - switch (pBucket->dataType) { +static void resetBoundingBox(MinMaxEntry* range, int32_t type) { + switch (type) { + case TSDB_DATA_TYPE_BIGINT: { + range->i64MaxVal = INT64_MIN; + range->i64MinVal = INT64_MAX; + break; + }; case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_TINYINT: { - pBucket->nRange.iMinVal = INT32_MAX; - pBucket->nRange.iMaxVal = INT32_MIN; - pBucket->HashFunc = tBucketIntHash; + range->iMaxVal = INT32_MIN; + range->iMinVal = INT32_MAX; break; }; case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_FLOAT: { - pBucket->nRange.dMinVal = DBL_MAX; - pBucket->nRange.dMaxVal = -DBL_MAX; - pBucket->HashFunc = tBucketDoubleHash; + range->dMaxVal = -DBL_MAX; + range->dMinVal = DBL_MAX; break; - }; - case TSDB_DATA_TYPE_BIGINT: { - pBucket->nRange.i64MinVal = INT64_MAX; - pBucket->nRange.i64MaxVal = INT64_MIN; - pBucket->HashFunc = tBucketBigIntHash; - break; - }; - default: { - uError("MemBucket:%p,not support data type %d,failed", pBucket, pBucket->dataType); - taosTFree(pBucket); - return NULL; + } + } +} + +static void resetPosInfo(SSlotInfo* pInfo) { + pInfo->size = 0; + pInfo->pageId = -1; + pInfo->data = NULL; +} + +double findOnlyResult(tMemBucket *pMemBucket) { + assert(pMemBucket->total == 1); + + for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) { + tMemBucketSlot *pSlot = &pMemBucket->pSlots[i]; + if (pSlot->info.size == 0) { + continue; + } + + int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times); + SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + assert(list->size == 1); + + SPageInfo* pgInfo = (SPageInfo*) taosArrayGetP(list, 0); + tFilePage* pPage = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); + assert(pPage->num == 1); + + switch (pMemBucket->type) { + case TSDB_DATA_TYPE_INT: + return *(int32_t *)pPage->data; + case TSDB_DATA_TYPE_SMALLINT: + return *(int16_t *)pPage->data; + case TSDB_DATA_TYPE_TINYINT: + return *(int8_t *)pPage->data; + case TSDB_DATA_TYPE_BIGINT: + return (double)(*(int64_t *)pPage->data); + case TSDB_DATA_TYPE_DOUBLE: { + double dv = GET_DOUBLE_VAL(pPage->data); + return dv; + } + case TSDB_DATA_TYPE_FLOAT: { + float fv = GET_FLOAT_VAL(pPage->data); + return fv; + } + default: + return 0; } } - int32_t numOfCols = pDesc->pColumnModel->numOfCols; - if (numOfCols != 1) { - uError("MemBucket:%p,only consecutive data is allowed,invalid numOfCols:%d", pBucket, numOfCols); - taosTFree(pBucket); + return 0; +} + +int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) { + int64_t v = *(int64_t *)value; + int32_t index = -1; + + int32_t halfSlot = pBucket->numOfSlots >> 1; +// int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1; + + if (pBucket->range.i64MaxVal == INT64_MIN) { + if (v >= 0) { + index = (v >> (64 - 9)) + halfSlot; + } else { // v<0 + index = ((-v) >> (64 - 9)); + index = -index + (halfSlot - 1); + } + + return index; + } else { + // todo hash for bigint and float and double + int64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal; + if (span < pBucket->numOfSlots) { + int32_t delta = (int32_t)(v - pBucket->range.i64MinVal); + index = delta % pBucket->numOfSlots; + } else { + double slotSpan = (double)span / pBucket->numOfSlots; + index = (v - pBucket->range.i64MinVal) / slotSpan; + if (v == pBucket->range.i64MaxVal) { + index -= 1; + } + } + + return index; + } +} + +// todo refactor to more generic +int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { + int32_t v = *(int32_t *)value; + int32_t index = -1; + + if (pBucket->range.iMaxVal == INT32_MIN) { + /* + * taking negative integer into consideration, + * there is only half of pBucket->segs available for non-negative integer + */ + int32_t halfSlot = pBucket->numOfSlots >> 1; + int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1; + + if (v >= 0) { + index = (v >> (bits - 9)) + halfSlot; + } else { // v < 0 + index = ((-v) >> (32 - 9)); + index = -index + (halfSlot - 1); + } + + return index; + } else { + // divide a range of [iMinVal, iMaxVal] into 1024 buckets + int32_t span = pBucket->range.iMaxVal - pBucket->range.iMinVal; + if (span < pBucket->numOfSlots) { + int32_t delta = v - pBucket->range.iMinVal; + index = (delta % pBucket->numOfSlots); + } else { + double slotSpan = (double)span / pBucket->numOfSlots; + index = (v - pBucket->range.iMinVal) / slotSpan; + if (v == pBucket->range.iMaxVal) { + index -= 1; + } + } + + return index; + } +} + +int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { + double v = GET_DOUBLE_VAL(value); + int32_t index = -1; + + if (pBucket->range.dMinVal == DBL_MAX) { + /* + * taking negative integer into consideration, + * there is only half of pBucket->segs available for non-negative integer + */ + double x = DBL_MAX / (pBucket->numOfSlots >> 1); + double posx = (v + DBL_MAX) / x; + return ((int32_t)posx) % pBucket->numOfSlots; + } else { + // divide a range of [dMinVal, dMaxVal] into 1024 buckets + double span = pBucket->range.dMaxVal - pBucket->range.dMinVal; + if (span < pBucket->numOfSlots) { + int32_t delta = (int32_t)(v - pBucket->range.dMinVal); + index = (delta % pBucket->numOfSlots); + } else { + double slotSpan = span / pBucket->numOfSlots; + index = (v - pBucket->range.dMinVal) / slotSpan; + if (v == pBucket->range.dMaxVal) { + index -= 1; + } + } + + if (index < 0 || index > pBucket->numOfSlots) { + uError("error in hash process. slot id: %d", index); + } + + return index; + } +} + +static __perc_hash_func_t getHashFunc(int32_t type) { + switch (type) { + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TINYINT: { + return tBucketIntHash; + }; + + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_FLOAT: { + return tBucketDoubleHash; + }; + + case TSDB_DATA_TYPE_BIGINT: { + return tBucketBigIntHash; + }; + + default: { + return NULL; + } + } +} + +static void resetSlotInfo(tMemBucket* pBucket) { + for (int32_t i = 0; i < pBucket->numOfSlots; ++i) { + tMemBucketSlot* pSlot = &pBucket->pSlots[i]; + + resetBoundingBox(&pSlot->range, pBucket->type); + resetPosInfo(&pSlot->info); + } +} + +tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType) { + tMemBucket *pBucket = (tMemBucket *)calloc(1, sizeof(tMemBucket)); + if (pBucket == NULL) { return NULL; } - SSchema* pSchema = getColumnModelSchema(pDesc->pColumnModel, 0); - if (pSchema->type != dataType) { - uError("MemBucket:%p,data type is not consistent,%d in schema, %d in param", pBucket, pSchema->type, dataType); - taosTFree(pBucket); + pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; + pBucket->bufPageSize = DEFAULT_PAGE_SIZE * 4; // 4k per page + + pBucket->type = dataType; + pBucket->bytes = nElemSize; + pBucket->total = 0; + pBucket->times = 1; + + pBucket->maxCapacity = 200000; + + pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes; + pBucket->comparFn = getKeyComparFunc(pBucket->type); + resetBoundingBox(&pBucket->range, pBucket->type); + + pBucket->hashFunc = getHashFunc(pBucket->type); + if (pBucket->hashFunc == NULL) { + uError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type); + free(pBucket); return NULL; } - if (pBucket->numOfTotalPages < pBucket->nTotalSlots) { - uWarn("MemBucket:%p,total buffer pages %d are not enough for all slots", pBucket, pBucket->numOfTotalPages); + pBucket->pSlots = (tMemBucketSlot *)calloc(pBucket->numOfSlots, sizeof(tMemBucketSlot)); + if (pBucket->pSlots == NULL) { + free(pBucket); + return NULL; } - pBucket->pSegs = (tMemBucketSegment *)malloc(pBucket->numOfSegs * sizeof(tMemBucketSegment)); + resetSlotInfo(pBucket); - for (int32_t i = 0; i < pBucket->numOfSegs; ++i) { - pBucket->pSegs[i].numOfSlots = pBucket->nSlotsOfSeg; - pBucket->pSegs[i].pBuffer = NULL; - pBucket->pSegs[i].pBoundingEntries = NULL; + int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bytes, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL); + if (ret != TSDB_CODE_SUCCESS) { + tMemBucketDestroy(pBucket); + return NULL; } - - uDebug("MemBucket:%p,created,buffer size:%ld,elem size:%d", pBucket, pBucket->numOfTotalPages * DEFAULT_PAGE_SIZE, - pBucket->nElemSize); - + + uDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes); return pBucket; } @@ -326,81 +307,11 @@ void tMemBucketDestroy(tMemBucket *pBucket) { return; } - if (pBucket->pSegs) { - for (int32_t i = 0; i < pBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &(pBucket->pSegs[i]); - taosTFree(pSeg->pBoundingEntries); - - if (pSeg->pBuffer == NULL || pSeg->numOfSlots == 0) { - continue; - } - - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - if (pSeg->pBuffer[j] != NULL) { - pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]); - } - } - taosTFree(pSeg->pBuffer); - } - } - - taosTFree(pBucket->pSegs); + destroyResultBuf(pBucket->pBuffer); + taosTFree(pBucket->pSlots); taosTFree(pBucket); } -/* - * find the slots which accounts for largest proportion of total in-memory buffer - */ -static void tBucketGetMaxMemSlot(tMemBucket *pBucket, int16_t *segIdx, int16_t *slotIdx) { - *segIdx = -1; - *slotIdx = -1; - - int32_t val = 0; - for (int32_t k = 0; k < pBucket->numOfSegs; ++k) { - tMemBucketSegment *pSeg = &pBucket->pSegs[k]; - for (int32_t i = 0; i < pSeg->numOfSlots; ++i) { - if (pSeg->pBuffer == NULL || pSeg->pBuffer[i] == NULL) { - continue; - } - - if (val < pSeg->pBuffer[i]->numOfInMemPages) { - val = pSeg->pBuffer[i]->numOfInMemPages; - *segIdx = k; - *slotIdx = i; - } - } - } -} - -static void resetBoundingBox(tMemBucketSegment *pSeg, int32_t type) { - switch (type) { - case TSDB_DATA_TYPE_BIGINT: { - for (int32_t i = 0; i < pSeg->numOfSlots; ++i) { - pSeg->pBoundingEntries[i].i64MaxVal = INT64_MIN; - pSeg->pBoundingEntries[i].i64MinVal = INT64_MAX; - } - break; - }; - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_TINYINT: { - for (int32_t i = 0; i < pSeg->numOfSlots; ++i) { - pSeg->pBoundingEntries[i].iMaxVal = INT32_MIN; - pSeg->pBoundingEntries[i].iMinVal = INT32_MAX; - } - break; - }; - case TSDB_DATA_TYPE_DOUBLE: - case TSDB_DATA_TYPE_FLOAT: { - for (int32_t i = 0; i < pSeg->numOfSlots; ++i) { - pSeg->pBoundingEntries[i].dMaxVal = -DBL_MAX; - pSeg->pBoundingEntries[i].dMinVal = DBL_MAX; - } - break; - } - } -} - void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { switch (dataType) { case TSDB_DATA_TYPE_INT: { @@ -461,7 +372,6 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { break; }; case TSDB_DATA_TYPE_FLOAT: { - // double val = *(float *)data; double val = GET_FLOAT_VAL(data); if (r->dMinVal > val) { @@ -478,171 +388,95 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { } /* - * in memory bucket, we only accept the simple data consecutive put in a row/column - * no column-model in this case. + * in memory bucket, we only accept data array list */ -void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows) { - pBucket->numOfElems += numOfRows; - int16_t segIdx = 0, slotIdx = 0; +void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { + assert(pBucket != NULL && data != NULL && size > 0); + pBucket->total += size; - for (int32_t i = 0; i < numOfRows; ++i) { - char *d = (char *)data + i * tDataTypeDesc[pBucket->dataType].nSize; + int32_t bytes = pBucket->bytes; - switch (pBucket->dataType) { - case TSDB_DATA_TYPE_SMALLINT: { - int32_t val = *(int16_t *)d; - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_TINYINT: { - int32_t val = *(int8_t *)d; - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_INT: { - int32_t val = *(int32_t *)d; - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - int64_t val = *(int64_t *)d; - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - // double val = *(double *)d; - double val = GET_DOUBLE_VAL(d); - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_FLOAT: { - // double val = *(float *)d; - double val = GET_FLOAT_VAL(d); - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - } + for (int32_t i = 0; i < size; ++i) { + char *d = (char *) data + i * bytes; - tMemBucketSegment *pSeg = &pBucket->pSegs[segIdx]; - if (pSeg->pBoundingEntries == NULL) { - pSeg->pBoundingEntries = (MinMaxEntry *)malloc(sizeof(MinMaxEntry) * pBucket->nSlotsOfSeg); - resetBoundingBox(pSeg, pBucket->dataType); - } + int32_t slotIdx = (pBucket->hashFunc)(pBucket, d); + assert(slotIdx >= 0); - if (pSeg->pBuffer == NULL) { - pSeg->pBuffer = (tExtMemBuffer **)calloc(pBucket->nSlotsOfSeg, sizeof(void *)); - } - - if (pSeg->pBuffer[slotIdx] == NULL) { - pSeg->pBuffer[slotIdx] = createExtMemBuffer(pBucket->numOfTotalPages * pBucket->pageSize, pBucket->nElemSize, - pBucket->pageSize, pBucket->pOrderDesc->pColumnModel); - pSeg->pBuffer[slotIdx]->flushModel = SINGLE_APPEND_MODEL; - pBucket->pOrderDesc->pColumnModel->capacity = pSeg->pBuffer[slotIdx]->numOfElemsPerPage; - } - - tMemBucketUpdateBoundingBox(&pSeg->pBoundingEntries[slotIdx], d, pBucket->dataType); + tMemBucketSlot *pSlot = &pBucket->pSlots[slotIdx]; + tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type); // ensure available memory pages to allocate - int16_t cseg = 0, cslot = 0; - if (pBucket->numOfAvailPages == 0) { - uDebug("MemBucket:%p,max avail size:%d, no avail memory pages,", pBucket, pBucket->numOfTotalPages); + int32_t groupId = getGroupId(pBucket->numOfSlots, slotIdx, pBucket->times); + int32_t pageId = -1; - tBucketGetMaxMemSlot(pBucket, &cseg, &cslot); - if (cseg == -1 || cslot == -1) { - uError("MemBucket:%p,failed to find appropriated avail buffer", pBucket); - return; + if (pSlot->info.data == NULL || pSlot->info.data->num >= pBucket->elemPerPage) { + if (pSlot->info.data != NULL) { + assert(pSlot->info.data->num >= pBucket->elemPerPage && pSlot->info.size > 0); + + // keep the pointer in memory + releaseResBufPage(pBucket->pBuffer, pSlot->info.data); + pSlot->info.data = NULL; } - if (cseg != segIdx || cslot != slotIdx) { - pBucket->numOfAvailPages += pBucket->pSegs[cseg].pBuffer[cslot]->numOfInMemPages; - - int32_t avail = pBucket->pSegs[cseg].pBuffer[cslot]->numOfInMemPages; - UNUSED(avail); - tExtMemBufferFlush(pBucket->pSegs[cseg].pBuffer[cslot]); - - uDebug("MemBucket:%p,seg:%d,slot:%d flushed to disk,new avail pages:%d", pBucket, cseg, cslot, - pBucket->numOfAvailPages); - } else { - uDebug("MemBucket:%p,failed to choose slot to flush to disk seg:%d,slot:%d", pBucket, cseg, cslot); - } + pSlot->info.data = getNewDataBuf(pBucket->pBuffer, groupId, &pageId); + pSlot->info.pageId = pageId; } - int16_t consumedPgs = pSeg->pBuffer[slotIdx]->numOfInMemPages; - int16_t newPgs = tExtMemBufferPut(pSeg->pBuffer[slotIdx], d, 1); - /* - * trigger 1. page re-allocation, to reduce the available pages - * 2. page flushout, to increase the available pages - */ - pBucket->numOfAvailPages += (consumedPgs - newPgs); + memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes); + + pSlot->info.data->num += 1; + pSlot->info.size += 1; } } -void releaseBucket(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { - if (segIdx < 0 || segIdx > pMemBucket->numOfSegs || slotIdx < 0) { - return; - } - - tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; - if (slotIdx < 0 || slotIdx >= pSeg->numOfSlots || pSeg->pBuffer[slotIdx] == NULL) { - return; - } - - pSeg->pBuffer[slotIdx] = destoryExtMemBuffer(pSeg->pBuffer[slotIdx]); -} - //////////////////////////////////////////////////////////////////////////////////////////// static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minVal) { *minVal = DBL_MAX; *maxVal = -DBL_MAX; - for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - if (pSeg->pBuffer == NULL) { + for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) { + tMemBucketSlot *pSlot = &pMemBucket->pSlots[i]; + if (pSlot->info.size == 0) { continue; } - switch (pMemBucket->dataType) { + + switch (pMemBucket->type) { case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_TINYINT: { - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - double minv = pSeg->pBoundingEntries[j].iMinVal; - double maxv = pSeg->pBoundingEntries[j].iMaxVal; + double minv = pSlot->range.iMinVal; + double maxv = pSlot->range.iMaxVal; - if (*minVal > minv) { - *minVal = minv; - } - if (*maxVal < maxv) { - *maxVal = maxv; - } + if (*minVal > minv) { + *minVal = minv; + } + if (*maxVal < maxv) { + *maxVal = maxv; } break; } case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_FLOAT: { - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - double minv = pSeg->pBoundingEntries[j].dMinVal; - double maxv = pSeg->pBoundingEntries[j].dMaxVal; + double minv = pSlot->range.dMinVal; + double maxv = pSlot->range.dMaxVal; - if (*minVal > minv) { - *minVal = minv; - } - if (*maxVal < maxv) { - *maxVal = maxv; - } + if (*minVal > minv) { + *minVal = minv; + } + if (*maxVal < maxv) { + *maxVal = maxv; } break; } case TSDB_DATA_TYPE_BIGINT: { - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - double minv = (double)pSeg->pBoundingEntries[j].i64MinVal; - double maxv = (double)pSeg->pBoundingEntries[j].i64MaxVal; + double minv = (double)pSlot->range.i64MinVal; + double maxv = (double)pSlot->range.i64MaxVal; - if (*minVal > minv) { - *minVal = minv; - } - if (*maxVal < maxv) { - *maxVal = maxv; - } + if (*minVal > minv) { + *minVal = minv; + } + if (*maxVal < maxv) { + *maxVal = maxv; } break; } @@ -650,20 +484,6 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV } } -static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBucket, int32_t segIdx) { - int32_t i = segIdx + 1; - while (i < pMemBucket->numOfSegs && pMemBucket->pSegs[i].numOfSlots == 0) ++i; - - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - assert(pMemBucket->numOfSegs > i && pMemBucket->pSegs[i].pBuffer != NULL); - - i = 0; - while (i < pMemBucket->nSlotsOfSeg && pSeg->pBuffer[i] == NULL) ++i; - - assert(i < pMemBucket->nSlotsOfSeg); - return pSeg->pBoundingEntries[i]; -} - /* * * now, we need to find the minimum value of the next slot for @@ -671,262 +491,198 @@ static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBuck * j is the last slot of current segment, we need to get the first * slot of the next segment. */ -static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; - - MinMaxEntry next; - if (slotIdx == pSeg->numOfSlots - 1) { // find next segment with data - return getMinMaxEntryOfNearestSlotInNextSegment(pMemBucket, segIdx); - } else { +static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int32_t slotIdx) { int32_t j = slotIdx + 1; - for (; j < pMemBucket->nSlotsOfSeg && pMemBucket->pSegs[segIdx].pBuffer[j] == 0; ++j) { + while (j < pMemBucket->numOfSlots && (pMemBucket->pSlots[j].info.size == 0)) { + ++j; + } + + assert(j < pMemBucket->numOfSlots); + return pMemBucket->pSlots[j].range; +} + +static bool isIdenticalData(tMemBucket *pMemBucket, int32_t index); +char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage); + +static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) { + assert(isIdenticalData(pMemBucket, slotIndex)); + + tMemBucketSlot *pSlot = &pMemBucket->pSlots[slotIndex]; + + double finalResult = 0.0; + switch (pMemBucket->type) { + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_INT: { + finalResult = pSlot->range.iMinVal; + break; + } + + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: { + finalResult = pSlot->range.dMinVal; + break; }; - if (j == pMemBucket->nSlotsOfSeg) { // current slot has no available - // slot,try next segment - return getMinMaxEntryOfNearestSlotInNextSegment(pMemBucket, segIdx); - } else { - next = pSeg->pBoundingEntries[slotIdx + 1]; - assert(pSeg->pBuffer[slotIdx + 1] != NULL); + case TSDB_DATA_TYPE_BIGINT: { + finalResult = pSlot->range.i64MinVal; + break; } } - return next; + return finalResult; } -bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx); -char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePage *pPage); - double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) { int32_t num = 0; - for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - if (pSeg->pBuffer == NULL || pSeg->pBuffer[j] == NULL) { - continue; - } - // required value in current slot - if (num < (count + 1) && num + pSeg->pBuffer[j]->numOfTotalElems >= (count + 1)) { - if (pSeg->pBuffer[j]->numOfTotalElems + num == (count + 1)) { - /* - * now, we need to find the minimum value of the next slot for interpolating the percentile value - * j is the last slot of current segment, we need to get the first slot of the next segment. - * - */ - MinMaxEntry next = getMinMaxEntryOfNextSlotWithData(pMemBucket, i, j); + for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) { + tMemBucketSlot *pSlot = &pMemBucket->pSlots[i]; + if (pSlot->info.size == 0) { + continue; + } - double maxOfThisSlot = 0; - double minOfNextSlot = 0; - switch (pMemBucket->dataType) { - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_TINYINT: { - maxOfThisSlot = pSeg->pBoundingEntries[j].iMaxVal; - minOfNextSlot = next.iMinVal; - break; - }; - case TSDB_DATA_TYPE_FLOAT: - case TSDB_DATA_TYPE_DOUBLE: { - maxOfThisSlot = pSeg->pBoundingEntries[j].dMaxVal; - minOfNextSlot = next.dMinVal; - break; - }; - case TSDB_DATA_TYPE_BIGINT: { - maxOfThisSlot = (double)pSeg->pBoundingEntries[j].i64MaxVal; - minOfNextSlot = (double)next.i64MinVal; - break; - } + // required value in current slot + if (num < (count + 1) && num + pSlot->info.size >= (count + 1)) { + if (pSlot->info.size + num == (count + 1)) { + /* + * now, we need to find the minimum value of the next slot for interpolating the percentile value + * j is the last slot of current segment, we need to get the first slot of the next segment. + */ + MinMaxEntry next = getMinMaxEntryOfNextSlotWithData(pMemBucket, i); + + double maxOfThisSlot = 0; + double minOfNextSlot = 0; + switch (pMemBucket->type) { + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TINYINT: { + maxOfThisSlot = pSlot->range.iMaxVal; + minOfNextSlot = next.iMinVal; + break; }; - - assert(minOfNextSlot > maxOfThisSlot); - - double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot; - return val; - } - if (pSeg->pBuffer[j]->numOfTotalElems <= pMemBucket->maxElemsCapacity) { - // data in buffer and file are merged together to be processed. - tFilePage *buffer = loadIntoBucketFromDisk(pMemBucket, i, j, pMemBucket->pOrderDesc); - int32_t currentIdx = count - num; - - char * thisVal = buffer->data + pMemBucket->nElemSize * currentIdx; - char * nextVal = thisVal + pMemBucket->nElemSize; - double td = 1.0, nd = 1.0; - switch (pMemBucket->dataType) { - case TSDB_DATA_TYPE_SMALLINT: { - td = *(int16_t *)thisVal; - nd = *(int16_t *)nextVal; - break; - } - case TSDB_DATA_TYPE_TINYINT: { - td = *(int8_t *)thisVal; - nd = *(int8_t *)nextVal; - break; - } - case TSDB_DATA_TYPE_INT: { - td = *(int32_t *)thisVal; - nd = *(int32_t *)nextVal; - break; - }; - case TSDB_DATA_TYPE_FLOAT: { - // td = *(float *)thisVal; - // nd = *(float *)nextVal; - td = GET_FLOAT_VAL(thisVal); - nd = GET_FLOAT_VAL(nextVal); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - // td = *(double *)thisVal; - td = GET_DOUBLE_VAL(thisVal); - // nd = *(double *)nextVal; - nd = GET_DOUBLE_VAL(nextVal); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - td = (double)*(int64_t *)thisVal; - nd = (double)*(int64_t *)nextVal; - break; - } + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: { + maxOfThisSlot = pSlot->range.dMaxVal; + minOfNextSlot = next.dMinVal; + break; + }; + case TSDB_DATA_TYPE_BIGINT: { + maxOfThisSlot = (double)pSlot->range.i64MaxVal; + minOfNextSlot = (double)next.i64MinVal; + break; } - double val = (1 - fraction) * td + fraction * nd; - taosTFree(buffer); + }; - return val; - } else { // incur a second round bucket split - if (isIdenticalData(pMemBucket, i, j)) { - tExtMemBuffer *pMemBuffer = pSeg->pBuffer[j]; + assert(minOfNextSlot > maxOfThisSlot); - tFilePage *pPage = (tFilePage *)malloc(pMemBuffer->pageSize); - - char *thisVal = getFirstElemOfMemBuffer(pSeg, j, pPage); - - double finalResult = 0.0; - - switch (pMemBucket->dataType) { - case TSDB_DATA_TYPE_SMALLINT: { - finalResult = *(int16_t *)thisVal; - break; - } - case TSDB_DATA_TYPE_TINYINT: { - finalResult = *(int8_t *)thisVal; - break; - } - case TSDB_DATA_TYPE_INT: { - finalResult = *(int32_t *)thisVal; - break; - }; - case TSDB_DATA_TYPE_FLOAT: { - // finalResult = *(float *)thisVal; - finalResult = GET_FLOAT_VAL(thisVal); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - // finalResult = *(double *)thisVal; - finalResult = GET_DOUBLE_VAL(thisVal); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - finalResult = (double)(*(int64_t *)thisVal); - break; - } - } - - free(pPage); - return finalResult; - } - - uDebug("MemBucket:%p,start second round bucketing", pMemBucket); - - if (pSeg->pBuffer[j]->numOfElemsInBuffer != 0) { - uDebug("MemBucket:%p,flush %d pages to disk, clear status", pMemBucket, pSeg->pBuffer[j]->numOfInMemPages); - - pMemBucket->numOfAvailPages += pSeg->pBuffer[j]->numOfInMemPages; - tExtMemBufferFlush(pSeg->pBuffer[j]); - } - - tExtMemBuffer *pMemBuffer = pSeg->pBuffer[j]; - pSeg->pBuffer[j] = NULL; - - // release all - for (int32_t tt = 0; tt < pMemBucket->numOfSegs; ++tt) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[tt]; - for (int32_t ttx = 0; ttx < pSeg->numOfSlots; ++ttx) { - if (pSeg->pBuffer && pSeg->pBuffer[ttx]) { - pSeg->pBuffer[ttx] = destoryExtMemBuffer(pSeg->pBuffer[ttx]); - } - } - } - - pMemBucket->nRange.i64MaxVal = pSeg->pBoundingEntries->i64MaxVal; - pMemBucket->nRange.i64MinVal = pSeg->pBoundingEntries->i64MinVal; - pMemBucket->numOfElems = 0; - - for (int32_t tt = 0; tt < pMemBucket->numOfSegs; ++tt) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[tt]; - for (int32_t ttx = 0; ttx < pSeg->numOfSlots; ++ttx) { - if (pSeg->pBoundingEntries) { - resetBoundingBox(pSeg, pMemBucket->dataType); - } - } - } - - tFilePage *pPage = (tFilePage *)malloc(pMemBuffer->pageSize); - - tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; - assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); - - int32_t ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); - UNUSED(ret); - - for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) { - size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); - if (sz != pMemBuffer->pageSize) { - uError("MemBucket:%p, read tmp file %s failed", pMemBucket, pMemBuffer->path); - } else { - tMemBucketPut(pMemBucket, pPage->data, (int32_t)pPage->num); - } - } - - fclose(pMemBuffer->file); - if (unlink(pMemBuffer->path) != 0) { - uError("MemBucket:%p, remove tmp file %s failed", pMemBucket, pMemBuffer->path); - } - taosTFree(pMemBuffer); - taosTFree(pPage); - - return getPercentileImpl(pMemBucket, count - num, fraction); - } - } else { - num += pSeg->pBuffer[j]->numOfTotalElems; + double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot; + return val; } + + if (pSlot->info.size <= pMemBucket->maxCapacity) { + // data in buffer and file are merged together to be processed. + tFilePage *buffer = loadDataFromFilePage(pMemBucket, i); + int32_t currentIdx = count - num; + + char *thisVal = buffer->data + pMemBucket->bytes * currentIdx; + char *nextVal = thisVal + pMemBucket->bytes; + + double td = 1.0, nd = 1.0; + switch (pMemBucket->type) { + case TSDB_DATA_TYPE_SMALLINT: { + td = *(int16_t *)thisVal; + nd = *(int16_t *)nextVal; + break; + } + case TSDB_DATA_TYPE_TINYINT: { + td = *(int8_t *)thisVal; + nd = *(int8_t *)nextVal; + break; + } + case TSDB_DATA_TYPE_INT: { + td = *(int32_t *)thisVal; + nd = *(int32_t *)nextVal; + break; + }; + case TSDB_DATA_TYPE_FLOAT: { + td = GET_FLOAT_VAL(thisVal); + nd = GET_FLOAT_VAL(nextVal); + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + td = GET_DOUBLE_VAL(thisVal); + nd = GET_DOUBLE_VAL(nextVal); + break; + } + case TSDB_DATA_TYPE_BIGINT: { + td = (double)*(int64_t *)thisVal; + nd = (double)*(int64_t *)nextVal; + break; + } + } + + double val = (1 - fraction) * td + fraction * nd; + taosTFree(buffer); + + return val; + } else { // incur a second round bucket split + if (isIdenticalData(pMemBucket, i)) { + return getIdenticalDataVal(pMemBucket, i); + } + + // try next round + pMemBucket->times += 1; + uDebug("MemBucket:%p, start next round data bucketing, time:%d", pMemBucket, pMemBucket->times); + + pMemBucket->range = pSlot->range; + pMemBucket->total = 0; + + resetSlotInfo(pMemBucket); + + int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1); + SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + assert(list->size > 0); + + for (int32_t f = 0; f < list->size; ++f) { + SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); + tFilePage *pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); + + tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); + releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo); + } + + return getPercentileImpl(pMemBucket, count - num, fraction); + } + } else { + num += pSlot->info.size; } } + return 0; } double getPercentile(tMemBucket *pMemBucket, double percent) { - if (pMemBucket->numOfElems == 0) { + if (pMemBucket->total == 0) { return 0.0; } - if (pMemBucket->numOfElems == 1) { // return the only element + // if only one elements exists, return it + if (pMemBucket->total == 1) { return findOnlyResult(pMemBucket); } percent = fabs(percent); - // validate the parameters + // find the min/max value, no need to scan all data in bucket if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) { double minx = 0, maxx = 0; - /* - * find the min/max value, no need to scan all data in bucket - */ findMaxMinValue(pMemBucket, &maxx, &minx); return fabs(percent - 100) < DBL_EPSILON ? maxx : minx; } - double percentVal = (percent * (pMemBucket->numOfElems - 1)) / ((double)100.0); + double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0); int32_t orderIdx = (int32_t)percentVal; // do put data by using buckets @@ -934,19 +690,18 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { } /* - * check if data in one slot are all identical - * only need to compare with the bounding box + * check if data in one slot are all identical only need to compare with the bounding box */ -bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; +bool isIdenticalData(tMemBucket *pMemBucket, int32_t index) { + tMemBucketSlot *pSeg = &pMemBucket->pSlots[index]; - if (pMemBucket->dataType == TSDB_DATA_TYPE_INT || pMemBucket->dataType == TSDB_DATA_TYPE_BIGINT || - pMemBucket->dataType == TSDB_DATA_TYPE_SMALLINT || pMemBucket->dataType == TSDB_DATA_TYPE_TINYINT) { - return pSeg->pBoundingEntries[slotIdx].i64MinVal == pSeg->pBoundingEntries[slotIdx].i64MaxVal; + if (pMemBucket->type == TSDB_DATA_TYPE_INT || pMemBucket->type == TSDB_DATA_TYPE_BIGINT || + pMemBucket->type == TSDB_DATA_TYPE_SMALLINT || pMemBucket->type == TSDB_DATA_TYPE_TINYINT) { + return pSeg->range.i64MinVal == pSeg->range.i64MaxVal; } - if (pMemBucket->dataType == TSDB_DATA_TYPE_FLOAT || pMemBucket->dataType == TSDB_DATA_TYPE_DOUBLE) { - return fabs(pSeg->pBoundingEntries[slotIdx].dMaxVal - pSeg->pBoundingEntries[slotIdx].dMinVal) < DBL_EPSILON; + if (pMemBucket->type == TSDB_DATA_TYPE_FLOAT || pMemBucket->type == TSDB_DATA_TYPE_DOUBLE) { + return fabs(pSeg->range.dMaxVal - pSeg->range.dMinVal) < DBL_EPSILON; } return false; @@ -956,24 +711,24 @@ bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { * get the first element of one slot into memory. * if no data of current slot in memory, load it from disk */ -char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePage *pPage) { - tExtMemBuffer *pMemBuffer = pSeg->pBuffer[slotIdx]; - char * thisVal = NULL; +char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage) { +// STSBuf *pMemBuffer = pSeg->pBuffer[slotIdx]; + char *thisVal = NULL; - if (pSeg->pBuffer[slotIdx]->numOfElemsInBuffer != 0) { - thisVal = pSeg->pBuffer[slotIdx]->pHead->item.data; - } else { - /* - * no data in memory, load one page into memory - */ - tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; - assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); - int32_t ret; - ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); - UNUSED(ret); - size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); - UNUSED(sz); - thisVal = pPage->data; - } +// if (pSeg->pBuffer[slotIdx]->numOfTotal != 0) { +//// thisVal = pSeg->pBuffer[slotIdx]->pHead->item.data; +// } else { +// /* +// * no data in memory, load one page into memory +// */ +// tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; +// assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); +// int32_t ret; +// ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); +// UNUSED(ret); +// size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); +// UNUSED(sz); +// thisVal = pPage->data; +// } return thisVal; } From 580448e381aa21062a15f33df88b11c6d1ad4be8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Aug 2020 16:41:16 +0800 Subject: [PATCH 2/8] [td-621] --- src/client/inc/tscUtil.h | 4 +-- src/client/src/tscAsync.c | 36 ++++++------------- src/client/src/tscLocal.c | 65 +++++++++++++++++++++++----------- src/client/src/tscLocalMerge.c | 7 ++-- src/client/src/tscServer.c | 16 +++------ src/client/src/tscSubquery.c | 50 ++++++++++++++++++++++---- src/client/src/tscUtil.c | 39 ++++++++++++-------- 7 files changed, 134 insertions(+), 83 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index f77897a74b..6bdc2c86ae 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -186,7 +186,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); -void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); +int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); void tscSqlExprInfoDestroy(SArray* pExprInfo); SColumn* tscColumnClone(const SColumn* src); @@ -204,7 +204,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid); void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw); -void tscTagCondCopy(STagCond* dest, const STagCond* src); +int32_t tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondRelease(STagCond* pCond); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 4643d255dc..41aa122160 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -50,7 +50,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->sqlstr = calloc(1, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); - tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY); + pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscQueueAsyncRes(pSql); return; } @@ -94,7 +95,6 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); if (pSql == NULL) { tscError("failed to malloc sqlObj"); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY); return; } @@ -191,7 +191,7 @@ void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRo tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy); } -void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) { +void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { SSqlObj *pSql = (SSqlObj *)taosa; if (pSql == NULL || pSql->signature != pSql) { tscError("sql object is NULL"); @@ -209,6 +209,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi if (pRes->qhandle == 0) { tscError("qhandle is NULL"); pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; + pSql->param = param; + tscQueueAsyncRes(pSql); return; } @@ -269,7 +271,10 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), if (pRes->qhandle == 0) { tscError("qhandle is NULL"); - tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE); + pSql->param = param; + pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; + + tscQueueAsyncRes(pSql); return; } @@ -352,36 +357,17 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { void tscProcessAsyncRes(SSchedMsg *pMsg) { SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; -// SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - -// void *taosres = pSql; - - // pCmd may be released, so cache pCmd->command -// int cmd = pCmd->command; -// int code = pRes->code; - - // in case of async insert, restore the user specified callback function -// bool shouldFree = tscShouldBeFreed(pSql); - -// if (pCmd->command == TSDB_SQL_INSERT) { -// assert(pSql->fp != NULL); assert(pSql->fp != NULL && pSql->fetchFp != NULL); -// } -// if (pSql->fp) { pSql->fp = pSql->fetchFp; (*pSql->fp)(pSql->param, pSql, pRes->code); -// } - -// if (shouldFree) { -// tscDebug("%p sqlObj is automatically freed in async res", pSql); -// tscFreeSqlObj(pSql); -// } } +// this function will be executed by queue task threads, so the terrno is not valid static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; + terrno = *(int32_t*) pMsg->msg; (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg); } diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index caaaa5bc18..b240d357a8 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -274,7 +274,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) { return tscSetValueToResObj(pSql, rowLen); } -static void tscProcessCurrentUser(SSqlObj *pSql) { +static int32_t tscProcessCurrentUser(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); @@ -282,14 +282,20 @@ static void tscProcessCurrentUser(SSqlObj *pSql) { pExpr->resType = TSDB_DATA_TYPE_BINARY; char* vx = calloc(1, pExpr->resBytes); + if (vx == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + size_t size = sizeof(pSql->pTscObj->user); STR_WITH_MAXSIZE_TO_VARSTR(vx, pSql->pTscObj->user, size); tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); free(vx); + + return TSDB_CODE_SUCCESS; } -static void tscProcessCurrentDB(SSqlObj *pSql) { +static int32_t tscProcessCurrentDB(SSqlObj *pSql) { char db[TSDB_DB_NAME_LEN] = {0}; extractDBName(pSql->pTscObj->db, db); @@ -302,6 +308,10 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { pExpr->resBytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE; char* vx = calloc(1, pExpr->resBytes); + if (vx == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + if (t == 0) { setVardataNull(vx, TSDB_DATA_TYPE_BINARY); } else { @@ -310,9 +320,11 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); free(vx); + + return TSDB_CODE_SUCCESS; } -static void tscProcessServerVer(SSqlObj *pSql) { +static int32_t tscProcessServerVer(SSqlObj *pSql) { const char* v = pSql->pTscObj->sversion; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -323,13 +335,18 @@ static void tscProcessServerVer(SSqlObj *pSql) { pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE); char* vx = calloc(1, pExpr->resBytes); + if (vx == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + STR_WITH_SIZE_TO_VARSTR(vx, v, (VarDataLenT)t); tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); - taosTFree(vx); + free(vx); + return TSDB_CODE_SUCCESS; } -static void tscProcessClientVer(SSqlObj *pSql) { +static int32_t tscProcessClientVer(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); @@ -339,23 +356,28 @@ static void tscProcessClientVer(SSqlObj *pSql) { pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE); char* v = calloc(1, pExpr->resBytes); + if (v == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + STR_WITH_SIZE_TO_VARSTR(v, version, (VarDataLenT)t); tscSetLocalQueryResult(pSql, v, pExpr->aliasName, pExpr->resType, pExpr->resBytes); - taosTFree(v); + free(v); + return TSDB_CODE_SUCCESS; } -static void tscProcessServStatus(SSqlObj *pSql) { +static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; if (pObj->pHb != NULL) { if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - return; + return pSql->res.code; } } else { if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - return; + return pSql->res.code; } } @@ -364,6 +386,7 @@ static void tscProcessServStatus(SSqlObj *pSql) { SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); int32_t val = 1; tscSetLocalQueryResult(pSql, (char*) &val, pExpr->aliasName, TSDB_DATA_TYPE_INT, sizeof(int32_t)); + return TSDB_CODE_SUCCESS; } void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength) { @@ -393,37 +416,39 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa int tscProcessLocalCmd(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; if (pCmd->command == TSDB_SQL_CFG_LOCAL) { - pSql->res.code = (uint8_t)taosCfgDynamicOptions(pCmd->payload); + pRes->code = (uint8_t)taosCfgDynamicOptions(pCmd->payload); } else if (pCmd->command == TSDB_SQL_DESCRIBE_TABLE) { - pSql->res.code = (uint8_t)tscProcessDescribeTable(pSql); + pRes->code = (uint8_t)tscProcessDescribeTable(pSql); } else if (pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { /* * set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to * free allocated resources and remove the SqlObj from sql query linked list */ - pSql->res.qhandle = 0x1; - pSql->res.numOfRows = 0; + pRes->qhandle = 0x1; + pRes->numOfRows = 0; } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { taosCacheEmpty(tscCacheHandle); + pRes->code = TSDB_CODE_SUCCESS; } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { - tscProcessServerVer(pSql); + pRes->code = tscProcessServerVer(pSql); } else if (pCmd->command == TSDB_SQL_CLI_VERSION) { - tscProcessClientVer(pSql); + pRes->code = tscProcessClientVer(pSql); } else if (pCmd->command == TSDB_SQL_CURRENT_USER) { - tscProcessCurrentUser(pSql); + pRes->code = tscProcessCurrentUser(pSql); } else if (pCmd->command == TSDB_SQL_CURRENT_DB) { - tscProcessCurrentDB(pSql); + pRes->code = tscProcessCurrentDB(pSql); } else if (pCmd->command == TSDB_SQL_SERV_STATUS) { - tscProcessServStatus(pSql); + pRes->code = tscProcessServStatus(pSql); } else { - pSql->res.code = TSDB_CODE_TSC_INVALID_SQL; + pRes->code = TSDB_CODE_TSC_INVALID_SQL; tscError("%p not support command:%d", pSql, pCmd->command); } // keep the code in local variable in order to avoid invalid read in case of async query - int32_t code = pSql->res.code; + int32_t code = pRes->code; if (code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, code); } else { diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index cabf2a6a11..759c08532a 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -67,8 +67,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc SQLFunctionCtx *pCtx = &pReducer->pCtx[i]; SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i); - pCtx->aOutputBuf = - pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity; + pCtx->aOutputBuf = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity; pCtx->order = pQueryInfo->order.order; pCtx->functionId = pExpr->functionId; @@ -160,7 +159,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (pMemBuffer == NULL) { tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); - tscError("%p pMemBuffer is NULL", pMemBuffer); pRes->code = TSDB_CODE_TSC_APP_ERROR; return; @@ -168,7 +166,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (pDesc->pColumnModel == NULL) { tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); - tscError("%p no local buffer or intermediate result format model", pSql); pRes->code = TSDB_CODE_TSC_APP_ERROR; return; @@ -188,7 +185,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (numOfFlush == 0 || numOfBuffer == 0) { tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscDebug("%p retrieved no data", pSql); - return; } @@ -279,6 +275,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd taosTFree(pReducer); return; } + param->pLocalData = pReducer->pLocalDataSrc; param->pDesc = pReducer->pDesc; param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ecb85472fc..6dcc7086b0 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -226,17 +226,13 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = &pSql->pRpcCtx, .code = 0 }; + // NOTE: the rpc context should be acquired before sending data to server. // Otherwise, the pSql object may have been released already during the response function, which is // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // cause crash. - if (pObj != NULL && pObj->signature == pObj) { - rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); - return TSDB_CODE_SUCCESS; - } else { - //pObj->signature has been reset by other thread, ignore concurrency problem - return TSDB_CODE_TSC_CONN_KILLED; - } + rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); + return TSDB_CODE_SUCCESS; } void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { @@ -1495,8 +1491,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { char *tmpData = NULL; uint32_t len = pSql->cmd.payloadLen; if (len > 0) { - tmpData = calloc(1, len); - if (NULL == tmpData) { + if ((tmpData = calloc(1, len)) == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1541,8 +1536,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // copy payload content to temp buff char *tmpData = 0; if (pCmd->payloadLen > 0) { - tmpData = calloc(1, pCmd->payloadLen + 1); - if (NULL == tmpData) return -1; + if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1; memcpy(tmpData, pCmd->payload, pCmd->payloadLen); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 2fb264c756..b1deeffced 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -570,8 +570,9 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); - *s1 = taosArrayInit(p1->num, p1->tagSize); - *s2 = taosArrayInit(p2->num, p2->tagSize); + // int16_t for padding + *s1 = taosArrayInit(p1->num, p1->tagSize - sizeof(int16_t)); + *s2 = taosArrayInit(p2->num, p2->tagSize - sizeof(int16_t)); if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) { return TSDB_CODE_QRY_DUP_JOIN_KEY; @@ -1039,6 +1040,10 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs); + if (pRes->pColumnIndex == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + return; + } for (int32_t i = 0; i < numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); @@ -1153,6 +1158,7 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); +// TODO int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -1199,7 +1205,9 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter // this data needs to be transfer to support struct memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); - tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);//pNewQueryInfo->tagCond; + if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } pNew->cmd.numOfCols = 0; pNewQueryInfo->intervalTime = 0; @@ -1380,7 +1388,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { const uint32_t nBufferSize = (1u << 16); // 64KB - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; @@ -1395,9 +1403,20 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { } pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); - + tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); + + if (pSql->pSubs == NULL || pState == NULL) { + taosTFree(pState); + taosTFree(pSql->pSubs); + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + + tscQueueAsyncRes(pSql); + return ret; + } + pState->numOfTotal = pSql->numOfSubs; pState->numOfRemain = pSql->numOfSubs; @@ -2029,8 +2048,21 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows)); } + if (numOfRes == 0) { + return; + } + int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList); - pRes->pRsp = realloc(pRes->pRsp, numOfRes * totalSize); + + assert(numOfRes * totalSize > 0); + char* tmp = realloc(pRes->pRsp, numOfRes * totalSize); + if (tmp == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + return; + } else { + pRes->pRsp = tmp; + } + pRes->data = pRes->pRsp; char* data = pRes->data; @@ -2069,6 +2101,12 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { pRes->buffer = calloc(numOfExprs, POINTER_BYTES); pRes->length = calloc(numOfExprs, sizeof(int32_t)); + if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscQueueAsyncRes(pSql); + return; + } + tscRestoreSQLFuncForSTableQuery(pQueryInfo); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b61fd7e8c9..7b09ef5902 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -254,15 +254,12 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { pRes->numOfCols = numOfOutput; pRes->tsrow = calloc(numOfOutput, POINTER_BYTES); - pRes->length = calloc(numOfOutput, sizeof(int32_t)); // todo refactor + pRes->length = calloc(numOfOutput, sizeof(int32_t)); pRes->buffer = calloc(numOfOutput, POINTER_BYTES); // not enough memory if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) { taosTFree(pRes->tsrow); - taosTFree(pRes->buffer); - taosTFree(pRes->length); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; return pRes->code; } @@ -281,13 +278,14 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) { } taosTFree(pRes->pRsp); + taosTFree(pRes->tsrow); taosTFree(pRes->length); - + taosTFree(pRes->buffer); + taosTFree(pRes->pGroupRec); taosTFree(pRes->pColumnIndex); - taosTFree(pRes->buffer); - + if (pRes->pArithSup != NULL) { taosTFree(pRes->pArithSup->data); taosTFree(pRes->pArithSup); @@ -1052,7 +1050,7 @@ void tscSqlExprInfoDestroy(SArray* pExprInfo) { taosArrayDestroy(pExprInfo); } -void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) { +int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) { assert(src != NULL && dst != NULL); size_t size = taosArrayGetSize(src); @@ -1064,7 +1062,7 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) if (deepcopy) { SSqlExpr* p1 = calloc(1, sizeof(SSqlExpr)); if (p1 == NULL) { - assert(0); + return -1; } *p1 = *pExpr; @@ -1078,6 +1076,8 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) } } } + + return 0; } SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { @@ -1324,11 +1324,14 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t return false; } -void tscTagCondCopy(STagCond* dest, const STagCond* src) { +int32_t tscTagCondCopy(STagCond* dest, const STagCond* src) { memset(dest, 0, sizeof(STagCond)); if (src->tbnameCond.cond != NULL) { dest->tbnameCond.cond = strdup(src->tbnameCond.cond); + if (dest->tbnameCond.cond == NULL) { + return -1; + } } dest->tbnameCond.uid = src->tbnameCond.uid; @@ -1337,7 +1340,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { dest->relType = src->relType; if (src->pCond == NULL) { - return; + return 0; } size_t s = taosArrayGetSize(src->pCond); @@ -1354,7 +1357,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { assert(pCond->cond != NULL); c.cond = malloc(c.len); if (c.cond == NULL) { - assert(0); + return -1; } memcpy(c.cond, pCond->cond, c.len); @@ -1362,6 +1365,8 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { taosArrayPush(dest->pCond, &c); } + + return 0; } void tscTagCondRelease(STagCond* pTagCond) { @@ -1854,7 +1859,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } } - tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); + if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } if (pQueryInfo->fillType != TSDB_FILL_NONE) { pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); @@ -1883,7 +1891,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; - tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true); + if (tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true) != 0) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } doSetSqlExprAndResultFieldInfo(pQueryInfo, pNewQueryInfo, uid); From 9b75283859b4710b871813f94c084375580643e3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Sep 2020 11:40:34 +0800 Subject: [PATCH 3/8] [td-1250] --- src/client/inc/tscSubquery.h | 2 +- src/client/src/tscSubquery.c | 61 +++++++++++++++++++--------- tests/script/general/parser/join.sim | 20 ++++++++- 3 files changed, 61 insertions(+), 22 deletions(-) diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index d5833675aa..07e0580397 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -30,7 +30,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); -int32_t tscHandleMasterJoinQuery(SSqlObj* pSql); +void tscHandleMasterJoinQuery(SSqlObj* pSql); int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b1deeffced..f8c4d77951 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1159,7 +1159,7 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); // TODO -int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { +int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -1304,52 +1304,75 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; } - return tscProcessSql(pNew); + return TSDB_CODE_SUCCESS; } -int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { +void tscHandleMasterJoinQuery(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); + int32_t code = TSDB_CODE_SUCCESS; + // todo add test SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); if (pState == NULL) { - pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; - return pSql->res.code; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; } pState->numOfTotal = pQueryInfo->numOfTables; pState->numOfRemain = pState->numOfTotal; + bool hasEmptySub = false; + tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); if (pSupporter == NULL) { // failed to create support struct, abort current query tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); - pState->numOfRemain = i; - pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; - if (0 == i) { - taosTFree(pState); - } - return pSql->res.code; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; } - int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter); + code = tscCreateJoinSubquery(pSql, i, pSupporter); if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query tscDestroyJoinSupporter(pSupporter); - pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; - if (0 == i) { - taosTFree(pState); - } + goto _error; + } + + SSqlObj* pSub = pSql->pSubs[i]; + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0); + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) { + hasEmptySub = true; break; } } - pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE; - - return TSDB_CODE_SUCCESS; + if (hasEmptySub) { // at least one subquery is empty, do nothing and return + freeJoinSubqueryObj(pSql); + pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; + (*pSql->fp)(pSql->param, pSql, 0); + } else { + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { + pState->numOfRemain = i - 1; // the already sent reques will continue and do not go to the error process routine + break; + } + } + + pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE; + } + + return; + + _error: + pRes->code = code; + tscQueueAsyncRes(pSql); } static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index 1bb0ff5448..f17e28c1da 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -482,15 +482,31 @@ sql insert into um2 using m2 tags(9) values(1000001, 10)(2000000, 20); sql_error select count(*) from m1,m2 where m1.a=m2.a and m1.ts=m2.ts; -#empty table join test, add for no result join test +print ====> empty table/empty super-table join test, add for no result join test sql create database ux1; sql use ux1; sql create table m1(ts timestamp, k int) tags(a binary(12), b int); sql create table tm0 using m1 tags('abc', 1); sql create table m2(ts timestamp, k int) tags(a int, b binary(12)); + +sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a; +if $rows != 0 then + return -1 +endi + sql create table tm2 using m2 tags(2, 'abc'); sql select count(*) from tm0, tm2 where tm0.ts=tm2.ts; -sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a +if $rows != 0 then + return -1 +endi + +sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a; +if $rows != 0 then + return -1 +endi + +sql drop table tm2; +sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a; sql drop database ux1; system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file From 2ae765caaf00be7b49c0ed14d6fe3b0323df3a32 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Sep 2020 12:00:00 +0800 Subject: [PATCH 4/8] [td-1278] --- src/client/src/tscSQLParser.c | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 5600ee3be9..c0e6d7f44e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5807,22 +5807,34 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { int32_t ret = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pList->nExpr; ++i) { - SSchema* pSchema = pTagSchema + i; + SSchema* pSchema = &pTagSchema[i]; + + char tagVal[TSDB_MAX_TAGS_LEN]; if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { - // validate the length of binary - if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pSchema->bytes) { + if (pList->a[i].pVar.nLen > pSchema->bytes) { tdDestroyKVRowBuilder(&kvRowBuilder); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } } - - char tagVal[TSDB_MAX_TAGS_LEN]; + ret = tVariantDump(&(pList->a[i].pVar), tagVal, pSchema->type, true); + + // check again after the convert since it may be converted from binary to nchar. + if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { + int16_t len = varDataTLen(tagVal); + if (len > pSchema->bytes) { + tdDestroyKVRowBuilder(&kvRowBuilder); + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } + } + if (ret != TSDB_CODE_SUCCESS) { tdDestroyKVRowBuilder(&kvRowBuilder); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); } + + tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); } From fb1178c452a9a4ae694d41df8be5c5bea421ab6d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Sep 2020 17:35:37 +0800 Subject: [PATCH 5/8] [td-1302] --- src/query/src/qExecutor.c | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 19324bc2cb..7604a619af 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2070,35 +2070,36 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat return false; } -#define PT_IN_WINDOW(_p, _w) ((_p) > (_w).skey && (_p) < (_w).ekey) - static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { STimeWindow w = {0}; TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey); TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey); - if (QUERY_IS_ASC_QUERY(pQuery)) { getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, sk, ek, &w); + assert(w.ekey >= pBlockInfo->window.skey); - if (PT_IN_WINDOW(w.ekey, pBlockInfo->window)) { + if (w.ekey < pBlockInfo->window.ekey) { return true; } while(1) { GET_NEXT_TIMEWINDOW(pQuery, &w); - if (w.skey > pBlockInfo->window.skey) { + if (w.skey > pBlockInfo->window.ekey) { break; } - if (PT_IN_WINDOW(w.skey, pBlockInfo->window) || PT_IN_WINDOW(w.ekey, pBlockInfo->window)) { + assert(w.ekey > pBlockInfo->window.ekey); + if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) { return true; } } } else { getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, sk, ek, &w); - if (PT_IN_WINDOW(w.skey, pBlockInfo->window)) { + assert(w.skey <= pBlockInfo->window.ekey); + + if (w.skey > pBlockInfo->window.skey) { return true; } @@ -2108,7 +2109,8 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { break; } - if (PT_IN_WINDOW(w.skey, pBlockInfo->window) || PT_IN_WINDOW(w.ekey, pBlockInfo->window)) { + assert(w.skey < pBlockInfo->window.skey); + if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) { return true; } } From 88ba66dd8b127dc003c7aa6ef619861f3c9463cb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 2 Sep 2020 10:08:35 +0800 Subject: [PATCH 6/8] [td-1290] --- src/common/src/tname.c | 5 ++++- src/query/src/qExecutor.c | 14 +++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 960cc7d725..965a548d26 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -105,7 +105,10 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in return startTime; } - int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime; + int64_t delta = startTime - intervalTime; + int32_t factor = delta > 0? 1:-1; + + int64_t start = (delta / slidingTime + factor) * slidingTime; if (!(timeUnit == 'u' || timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) { /* * here we revised the start time of day according to the local time zone, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 7604a619af..6d74583a2c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1909,6 +1909,15 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) { return; } + if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + pQuery->order.order = TSDB_ORDER_ASC; + if (pQuery->window.skey > pQuery->window.ekey) { + SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); + } + + return; + } + if (isPointInterpoQuery(pQuery) && pQuery->intervalTime == 0) { if (!QUERY_IS_ASC_QUERY(pQuery)) { qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, @@ -4387,7 +4396,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo // NOTE: pTableCheckInfo need to update the query time range and the lastKey info // TODO fixme - changeExecuteScanOrder(pQInfo, false); + changeExecuteScanOrder(pQInfo, isSTableQuery); code = setupQueryHandle(tsdb, pQInfo, isSTableQuery); if (code != TSDB_CODE_SUCCESS) { @@ -6095,6 +6104,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, goto _cleanup; } + // NOTE: pTableCheckInfo need to update the query time range and the lastKey info + changeExecuteScanOrder(pQInfo, isSTableQuery); + int32_t index = 0; for(int32_t i = 0; i < numOfGroups; ++i) { From b3c76157a1ee5644bdd24898f6b5ffeea125b88e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 2 Sep 2020 10:09:58 +0800 Subject: [PATCH 7/8] [td-225] --- src/util/src/hash.c | 1 - src/util/src/tcache.c | 14 +++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 7d10545ce7..2912b0a891 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -327,7 +327,6 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe // no data, return directly if (pe->num == 0) { - assert(pe->next == NULL); __rd_unlock(&pHashObj->lock, pHashObj->type); return -1; } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index dfa982b848..9df05a0ee8 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -381,10 +381,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { if (ret == 0) { if (ref > 0) { assert(pNode->pTNodeHeader == NULL); - - __cache_wr_lock(pCacheObj); taosAddToTrash(pCacheObj, pNode); - __cache_unlock(pCacheObj); } else { // ref == 0 atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size); @@ -485,18 +482,21 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { STrashElem *pElem = calloc(1, sizeof(STrashElem)); pElem->pData = pNode; + pElem->prev = NULL; + + pNode->pTNodeHeader = pElem; + pNode->inTrashCan = true; + + __cache_wr_lock(pCacheObj); pElem->next = pCacheObj->pTrash; if (pCacheObj->pTrash) { pCacheObj->pTrash->prev = pElem; } - pElem->prev = NULL; pCacheObj->pTrash = pElem; - - pNode->inTrashCan = true; - pNode->pTNodeHeader = pElem; pCacheObj->numOfElemsInTrash++; + __cache_unlock(pCacheObj); uDebug("%s key:%p, %p move to trash, numOfElem in trash:%d", pCacheObj->name, pNode->key, pNode->data, pCacheObj->numOfElemsInTrash); From bc63c4e0bc65d999d22dbac946c24ca8ca7c1232 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 2 Sep 2020 17:07:44 +0800 Subject: [PATCH 8/8] [td-225] fix compiler error. --- src/query/src/qExecutor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 08bc21c109..78632023f3 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6161,7 +6161,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, } // NOTE: pTableCheckInfo need to update the query time range and the lastKey info - changeExecuteScanOrder(pQInfo, isSTableQuery); +// changeExecuteScanOrder(pQInfo, stableQuery); int32_t index = 0;