From f582705febb3a887dc3296d03f4e04bbb44d9cc3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 8 Jun 2023 14:18:06 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/common/tdatablock.h | 9 +- source/common/src/tdatablock.c | 510 +++++++----------------- source/libs/executor/src/executorInt.c | 141 +------ source/libs/executor/src/scanoperator.c | 34 +- 4 files changed, 177 insertions(+), 517 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 33c571fc1b..1fa5b63a9a 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -186,7 +186,6 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); -void colDataTrim(SColumnInfoData* pColumnInfoData); size_t blockDataGetNumOfCols(const SSDataBlock* pBlock); size_t blockDataGetNumOfRows(const SSDataBlock* pBlock); @@ -206,7 +205,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(uint32_t numOfCols); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); -int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); @@ -235,11 +233,10 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); +int32_t blockGetEncodeSize(const SSDataBlock* pBlock); int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); const char* blockDecode(SSDataBlock* pBlock, const char* pData); -void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag); -void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); @@ -248,9 +245,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); -static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { - return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); -} +void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); #ifdef __cplusplus } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 311c79381c..fe38225ac8 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -33,7 +33,7 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo } } -int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { +static int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows; } else { @@ -42,10 +42,6 @@ int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t num } } -void colDataTrim(SColumnInfoData* pColumnInfoData) { - // TODO -} - int32_t getJsonValueLen(const char* data) { int32_t dataLen = 0; if (*data == TSDB_DATA_TYPE_NULL) { @@ -820,41 +816,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { return 0; } -static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, - int32_t tupleIndex) { - int32_t code = 0; - size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock); - - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pDst = &pDstCols[i]; - SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, i); - - if (pSrc->hasNull && colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, pSrcBlock->pBlockAgg[i])) { - code = colDataSetVal(pDst, numOfRows, NULL, true); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { - char* p = colDataGetData(pSrc, tupleIndex); - code = colDataSetVal(pDst, numOfRows, p, false); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - } - - return TSDB_CODE_SUCCESS; -} - static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) { -#if 0 - for (int32_t i = 0; i < pDataBlock->info.rows; ++i) { - int32_t code = doAssignOneTuple(pCols, i, pDataBlock, index[i]); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } -#else + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = &pCols[i]; @@ -879,7 +842,7 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB } } } -#endif + return TSDB_CODE_SUCCESS; } @@ -1039,114 +1002,6 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { return TSDB_CODE_SUCCESS; } -#if 0 -typedef struct SHelper { - int32_t index; - union { - char* pData; - int64_t i64; - double d64; - }; -} SHelper; - -SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* pBlock) { - int32_t sortValLengthPerRow = 0; - int32_t numOfCols = taosArrayGetSize(pOrderInfo); - - for (int32_t i = 0; i < numOfCols; ++i) { - SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->slotId); - pInfo->pColData = pColInfo; - sortValLengthPerRow += pColInfo->info.bytes; - } - - size_t len = sortValLengthPerRow * pBlock->info.rows; - - char* buf = taosMemoryCalloc(1, len); - SHelper* phelper = taosMemoryCalloc(numOfRows, sizeof(SHelper)); - for (int32_t i = 0; i < numOfRows; ++i) { - phelper[i].index = i; - phelper[i].pData = buf + sortValLengthPerRow * i; - } - - int32_t offset = 0; - for (int32_t i = 0; i < numOfCols; ++i) { - SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); - for (int32_t j = 0; j < numOfRows; ++j) { - phelper[j].i64 = *(int32_t*)pInfo->pColData->pData + pInfo->pColData->info.bytes * j; - // memcpy(phelper[j].pData + offset, pInfo->pColData->pData + pInfo->pColData->info.bytes * j, - // pInfo->pColData->info.bytes); - } - - offset += pInfo->pColData->info.bytes; - } - - taosMemoryFree(buf); - return phelper; -} - -int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) { - const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param; - - SHelper* left = (SHelper*)p1; - SHelper* right = (SHelper*)p2; - - SArray* pInfo = pHelper->orderInfo; - - int32_t offset = 0; - int32_t leftx = *(int32_t*)left->pData; //*(int32_t*)(left->pData + offset); - int32_t rightx = *(int32_t*)right->pData; //*(int32_t*)(right->pData + offset); - - if (leftx == rightx) { - return 0; - } else { - return (leftx < rightx) ? -1 : 1; - } - return 0; -} - -int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) { - // Allocate the additional buffer. - int64_t p0 = taosGetTimestampUs(); - - SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; - - uint32_t rows = pDataBlock->info.rows; - SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock); - if (index == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; - } - - taosqsort(index, rows, sizeof(SHelper), &helper, dataBlockCompar_rv); - - int64_t p1 = taosGetTimestampUs(); - SColumnInfoData* pCols = createHelpColInfoData(pDataBlock); - if (pCols == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; - } - - int64_t p2 = taosGetTimestampUs(); - - // int32_t code = blockDataAssign(pCols, pDataBlock, index); - // if (code != TSDB_CODE_SUCCESS) { - // terrno = code; - // return code; - // } - - int64_t p3 = taosGetTimestampUs(); - - copyBackToBlock(pDataBlock, pCols); - int64_t p4 = taosGetTimestampUs(); - - printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, - p3 - p2, p4 - p3, rows); - // destroyTupleIndex(index); - return 0; -} -#endif - void blockDataCleanup(SSDataBlock* pDataBlock) { blockDataEmpty(pDataBlock); SDataBlockInfo* pInfo = &pDataBlock->info; @@ -1299,6 +1154,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) { return NULL; } +// todo remove it int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { dst->info = src->info; dst->info.rows = 0; @@ -1679,16 +1535,6 @@ static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_ if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n); memset(&pColInfoData->varmeta.offset[n], 0, total - n); - } else { // reset the bitmap value - /*int32_t stopIndex = BitmapLen(n) * 8; - for(int32_t i = n; i < stopIndex; ++i) { - colDataClearNull_f(pColInfoData->nullbitmap, i); - } - - int32_t remain = BitmapLen(total) - BitmapLen(n); - if (remain > 0) { - memset(pColInfoData->nullbitmap+BitmapLen(n), 0, remain); - }*/ } } @@ -1782,32 +1628,6 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { return (void*)buf; } -int32_t tEncodeDataBlocks(void** buf, const SArray* blocks) { - int32_t tlen = 0; - int32_t sz = taosArrayGetSize(blocks); - tlen += taosEncodeFixedI32(buf, sz); - - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pBlock = taosArrayGet(blocks, i); - tlen += tEncodeDataBlock(buf, pBlock); - } - - return tlen; -} - -void* tDecodeDataBlocks(const void* buf, SArray** blocks) { - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - - *blocks = taosArrayInit(sz, sizeof(SSDataBlock)); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock pBlock = {0}; - buf = tDecodeDataBlock(buf, &pBlock); - taosArrayPush(*blocks, &pBlock); - } - return (void*)buf; -} - static char* formatTimestamp(char* buf, int64_t val, int precision) { time_t tt; int32_t ms = 0; @@ -2060,182 +1880,6 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) return dumpBuf; } -/** - * @brief TODO: Assume that the final generated result it less than 3M - * - * @param pReq - * @param pDataBlocks - * @param vgId - * @param suid - * - */ -#if 0 -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, - tb_uid_t suid) { - int32_t bufSize = sizeof(SSubmitReq); - int32_t sz = 1; - for (int32_t i = 0; i < sz; ++i) { - const SDataBlockInfo* pBlkInfo = &pDataBlock->info; - - int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); - bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum)); - bufSize += sizeof(SSubmitBlk); - } - - *pReq = taosMemoryCalloc(1, bufSize); - if (!(*pReq)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } - void* pDataBuf = *pReq; - - int32_t msgLen = sizeof(SSubmitReq); - int32_t numOfBlks = 0; - SRowBuilder rb = {0}; - tdSRowInit(&rb, pTSchema->version); - - for (int32_t i = 0; i < sz; ++i) { - int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); - int32_t rows = pDataBlock->info.rows; - - if (colNum <= 1) { - // invalid if only with TS col - continue; - } - - if (rb.nCols != colNum) { - tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); - } - - SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen); - pSubmitBlk->suid = suid; - pSubmitBlk->uid = pDataBlock->info.id.groupId; - pSubmitBlk->numOfRows = rows; - pSubmitBlk->sversion = pTSchema->version; - - msgLen += sizeof(SSubmitBlk); - int32_t dataLen = 0; - for (int32_t j = 0; j < rows; ++j) { // iterate by row - tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen + dataLen)); // set row buf - bool isStartKey = false; - int32_t offset = 0; - for (int32_t k = 0; k < colNum; ++k) { // iterate by column - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); - STColumn* pCol = &pTSchema->columns[k]; - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - switch (pColInfoData->info.type) { - case TSDB_DATA_TYPE_TIMESTAMP: - if (!isStartKey) { - isStartKey = true; - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, - offset, k); - continue; // offset should keep 0 for next column - - } else if (colDataIsNull_s(pColInfoData, j)) { - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NULL, NULL, - false, offset, k); - } else { - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, - true, offset, k); - } - break; - case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_VARCHAR: // TSDB_DATA_TYPE_BINARY - case TSDB_DATA_TYPE_GEOMETRY: { - if (colDataIsNull_s(pColInfoData, j)) { - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NULL, NULL, - false, offset, k); - } else { - void* data = colDataGetData(pColInfoData, j); - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NORM, data, - true, offset, k); - } - break; - } - case TSDB_DATA_TYPE_VARBINARY: - case TSDB_DATA_TYPE_DECIMAL: - case TSDB_DATA_TYPE_BLOB: - case TSDB_DATA_TYPE_JSON: - case TSDB_DATA_TYPE_MEDIUMBLOB: - uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); - break; - default: - if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { - if (colDataIsNull_s(pColInfoData, j)) { - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NULL, NULL, false, - offset, k); - } else if (pCol->type == pColInfoData->info.type) { - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset, - k); - } else { - char tv[8] = {0}; - if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = 0; - GET_TYPED_DATA(v, float, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) { - double v = 0; - GET_TYPED_DATA(v, double, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) { - int64_t v = 0; - GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } else { - uint64_t v = 0; - GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset, - k); - } - } else { - uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); - } - break; - } - offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation - } - tdSRowEnd(&rb); - dataLen += TD_ROW_LEN(rb.pBuf); -#ifdef TD_DEBUG_PRINT_ROW - tdSRowPrint(rb.pBuf, pTSchema, __func__); -#endif - } - - ++numOfBlks; - - pSubmitBlk->dataLen = dataLen; - msgLen += pSubmitBlk->dataLen; - } - - if (numOfBlks > 0) { - (*pReq)->length = msgLen; - - (*pReq)->header.vgId = htonl(vgId); - (*pReq)->header.contLen = htonl(msgLen); - (*pReq)->length = (*pReq)->header.contLen; - (*pReq)->numOfBlocks = htonl(numOfBlks); - SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); - while (numOfBlks--) { - int32_t dataLen = blk->dataLen; - blk->uid = htobe64(blk->uid); - blk->suid = htobe64(blk->suid); - blk->sversion = htonl(blk->sversion); - blk->dataLen = htonl(blk->dataLen); - blk->schemaLen = htonl(blk->schemaLen); - blk->numOfRows = htonl(blk->numOfRows); - blk = (SSubmitBlk*)(blk->data + dataLen); - } - } else { - // no valid rows - taosMemoryFreeClear(*pReq); - } - - return TSDB_CODE_SUCCESS; -} -#endif - int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid) { SSubmitReq2* pReq = *ppReq; @@ -2610,3 +2254,149 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { ASSERT(pStart - pData == dataLen); return pStart; } + +void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { +// int32_t totalRows = pBlock->info.rows; + int32_t bmLen = BitmapLen(totalRows); + char* pBitmap = NULL; + int32_t maxRows = 0; + + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + // it is a reserved column for scalar function, and no data in this column yet. + if (pDst->pData == NULL) { + continue; + } + + int32_t numOfRows = 0; + if (IS_VAR_DATA_TYPE(pDst->info.type)) { + int32_t j = 0; + pDst->varmeta.length = 0; + + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + + if (colDataIsNull_var(pDst, j)) { + colDataSetNull_var(pDst, numOfRows); + } else { + // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first copy it to p2 + char* p1 = colDataGetVarData(pDst, j); + int32_t len = 0; + if (pDst->info.type == TSDB_DATA_TYPE_JSON) { + len = getJsonValueLen(p1); + } else { + len = varDataTLen(p1); + } + char* p2 = taosMemoryMalloc(len); + memcpy(p2, p1, len); + colDataSetVal(pDst, numOfRows, p2, false); + taosMemoryFree(p2); + } + numOfRows += 1; + j += 1; + } + + if (maxRows < numOfRows) { + maxRows = numOfRows; + } + } else { + if (pBitmap == NULL) { + pBitmap = taosMemoryCalloc(1, bmLen); + } + + memcpy(pBitmap, pDst->nullbitmap, bmLen); + memset(pDst->nullbitmap, 0, bmLen); + + int32_t j = 0; + + switch (pDst->info.type) { + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_TIMESTAMP: + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + + if (colDataIsNull_f(pBitmap, j)) { + colDataSetNull_f(pDst->nullbitmap, numOfRows); + } else { + ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j]; + } + numOfRows += 1; + j += 1; + } + break; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + if (colDataIsNull_f(pBitmap, j)) { + colDataSetNull_f(pDst->nullbitmap, numOfRows); + } else { + ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j]; + } + numOfRows += 1; + j += 1; + } + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + if (colDataIsNull_f(pBitmap, j)) { + colDataSetNull_f(pDst->nullbitmap, numOfRows); + } else { + ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j]; + } + numOfRows += 1; + j += 1; + } + break; + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + if (colDataIsNull_f(pBitmap, j)) { + colDataSetNull_f(pDst->nullbitmap, numOfRows); + } else { + ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j]; + } + numOfRows += 1; + j += 1; + } + break; + } + } + + if (maxRows < numOfRows) { + maxRows = numOfRows; + } + } + + pBlock->info.rows = maxRows; + if (pBitmap != NULL) { + taosMemoryFree(pBitmap); + } +} + +int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { + return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); +} \ No newline at end of file diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 4f1a0254e4..2606d0ad0c 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -540,151 +540,12 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD } int8_t* pIndicator = (int8_t*)p->pData; - int32_t totalRows = pBlock->info.rows; - if (status == FILTER_RESULT_ALL_QUALIFIED) { // here nothing needs to be done } else if (status == FILTER_RESULT_NONE_QUALIFIED) { pBlock->info.rows = 0; } else { - int32_t bmLen = BitmapLen(totalRows); - char* pBitmap = NULL; - int32_t maxRows = 0; - - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); - // it is a reserved column for scalar function, and no data in this column yet. - if (pDst->pData == NULL) { - continue; - } - - int32_t numOfRows = 0; - if (IS_VAR_DATA_TYPE(pDst->info.type)) { - int32_t j = 0; - pDst->varmeta.length = 0; - - while (j < totalRows) { - if (pIndicator[j] == 0) { - j += 1; - continue; - } - - if (colDataIsNull_var(pDst, j)) { - colDataSetNull_var(pDst, numOfRows); - } else { - // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first copy it to p2 - char* p1 = colDataGetVarData(pDst, j); - int32_t len = 0; - if (pDst->info.type == TSDB_DATA_TYPE_JSON) { - len = getJsonValueLen(p1); - } else { - len = varDataTLen(p1); - } - char* p2 = taosMemoryMalloc(len); - memcpy(p2, p1, len); - colDataSetVal(pDst, numOfRows, p2, false); - taosMemoryFree(p2); - } - numOfRows += 1; - j += 1; - } - - if (maxRows < numOfRows) { - maxRows = numOfRows; - } - } else { - if (pBitmap == NULL) { - pBitmap = taosMemoryCalloc(1, bmLen); - } - - memcpy(pBitmap, pDst->nullbitmap, bmLen); - memset(pDst->nullbitmap, 0, bmLen); - - int32_t j = 0; - - switch (pDst->info.type) { - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: - case TSDB_DATA_TYPE_DOUBLE: - case TSDB_DATA_TYPE_TIMESTAMP: - while (j < totalRows) { - if (pIndicator[j] == 0) { - j += 1; - continue; - } - - if (colDataIsNull_f(pBitmap, j)) { - colDataSetNull_f(pDst->nullbitmap, numOfRows); - } else { - ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j]; - } - numOfRows += 1; - j += 1; - } - break; - case TSDB_DATA_TYPE_FLOAT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: - while (j < totalRows) { - if (pIndicator[j] == 0) { - j += 1; - continue; - } - if (colDataIsNull_f(pBitmap, j)) { - colDataSetNull_f(pDst->nullbitmap, numOfRows); - } else { - ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j]; - } - numOfRows += 1; - j += 1; - } - break; - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_USMALLINT: - while (j < totalRows) { - if (pIndicator[j] == 0) { - j += 1; - continue; - } - if (colDataIsNull_f(pBitmap, j)) { - colDataSetNull_f(pDst->nullbitmap, numOfRows); - } else { - ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j]; - } - numOfRows += 1; - j += 1; - } - break; - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_UTINYINT: - while (j < totalRows) { - if (pIndicator[j] == 0) { - j += 1; - continue; - } - if (colDataIsNull_f(pBitmap, j)) { - colDataSetNull_f(pDst->nullbitmap, numOfRows); - } else { - ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j]; - } - numOfRows += 1; - j += 1; - } - break; - } - } - - if (maxRows < numOfRows) { - maxRows = numOfRows; - } - } - - pBlock->info.rows = maxRows; - if (pBitmap != NULL) { - taosMemoryFree(pBitmap); - } + trimDataBlock(pBlock, pBlock->info.rows, (bool*) pIndicator); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c892617783..4434a29870 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -485,12 +485,12 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int } int32_t code = 0; + bool freeReader = false; // backup the rows int32_t backupRows = pBlock->info.rows; pBlock->info.rows = rows; - bool freeReader = false; STableCachedVal val = {0}; SMetaReader mr = {0}; @@ -1553,13 +1553,13 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); - pInfo->pRes->info.rows = pBlock->info.rows; - pInfo->pRes->info.id.uid = pBlock->info.id.uid; - pInfo->pRes->info.type = STREAM_NORMAL; - pInfo->pRes->info.version = pBlock->info.version; + pBlockInfo->rows = pBlock->info.rows; + pBlockInfo->id.uid = pBlock->info.id.uid; + pBlockInfo->type = STREAM_NORMAL; + pBlockInfo->version = pBlock->info.version; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); + pBlockInfo->id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -1589,7 +1589,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); + pBlockInfo->rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { blockDataFreeRes((SSDataBlock*)pBlock); @@ -1606,7 +1606,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); -// blockDataFreeRes((SSDataBlock*)pBlock); calBlockTbName(pInfo, pInfo->pRes); return 0; @@ -2088,11 +2087,26 @@ FETCH_NEXT_BLOCK: return pInfo->pCreateTbRes; } - // todo apply time window range filter - doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + { // do additional time window filter + STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; + + if (pWindow->skey != 0) { + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*) colDataGetData(pCol, i); + p[i] = (*ts >= pWindow->skey); + } + + trimDataBlock(pBlock, pBlock->info.rows, p); + taosMemoryFree(p); + } + } + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);