From 518964390989c6e3063b6d21258763f31198f395 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 9 Apr 2022 16:44:31 +0800 Subject: [PATCH] extract compressed col data format --- include/common/tdatablock.h | 57 ++++++++++++-- source/dnode/vnode/src/tq/tq.c | 25 +++--- source/libs/executor/src/dataDispatcher.c | 94 ++++++----------------- 3 files changed, 93 insertions(+), 83 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 98edd9f0a5..73c06e7cc9 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -17,6 +17,7 @@ #define _TD_COMMON_EP_H_ #include "tcommon.h" +#include "tcompression.h" #include "tmsg.h" #ifdef __cplusplus @@ -115,11 +116,11 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { - for(int32_t i = start; i < start + nRows; ++i) { + for (int32_t i = start; i < start + nRows; ++i) { pColumnInfoData->varmeta.offset[i] = -1; // it is a null value of VAR type. } } else { - for(int32_t i = start; i < start + nRows; ++i) { + for (int32_t i = start; i < start + nRows; ++i) { colDataSetNull_f(pColumnInfoData->nullbitmap, i); } } @@ -135,7 +136,8 @@ static FORCE_INLINE int32_t colDataAppendInt8(SColumnInfoData* pColumnInfoData, } static FORCE_INLINE int32_t colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT); + ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || + pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; *(int16_t*)p = *(int16_t*)v; } @@ -164,7 +166,6 @@ static FORCE_INLINE int32_t colDataAppendDouble(SColumnInfoData* pColumnInfoData char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; *(double*)p = *(double*)v; } - int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2); @@ -200,12 +201,58 @@ void blockDataCleanup(SSDataBlock* pDataBlock); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); void* blockDataDestroy(SSDataBlock* pBlock); -int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n); +int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); void blockDebugShowData(const SArray* dataBlocks); +static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, + int8_t compressed) { + int32_t colSize = colDataGetLength(pColRes, numOfRows); + return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data, + colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); +} + +static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, + int8_t needCompress) { + int32_t* colSizes = (int32_t*)data; + + data += numOfCols * sizeof(int32_t); + *dataLen = (numOfCols * sizeof(int32_t)); + + int32_t numOfRows = pBlock->info.rows; + for (int32_t col = 0; col < numOfCols; ++col) { + SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col); + + // copy the null bitmap + if (IS_VAR_DATA_TYPE(pColRes->info.type)) { + size_t metaSize = numOfRows * sizeof(int32_t); + memcpy(data, pColRes->varmeta.offset, metaSize); + data += metaSize; + (*dataLen) += metaSize; + } else { + int32_t len = BitmapLen(numOfRows); + memcpy(data, pColRes->nullbitmap, len); + data += len; + (*dataLen) += len; + } + + if (needCompress) { + colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress); + data += colSizes[col]; + (*dataLen) += colSizes[col]; + } else { + colSizes[col] = colDataGetLength(pColRes, numOfRows); + (*dataLen) += colSizes[col]; + memmove(data, pColRes->pData, colSizes[col]); + data += colSizes[col]; + } + + colSizes[col] = htonl(colSizes[col]); + } +} + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index beefbc3d84..0d4acd4594 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -296,7 +296,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } STqTopic* pTopic = NULL; - int sz = taosArrayGetSize(pConsumer->topics); + int32_t sz = taosArrayGetSize(pConsumer->topics); for (int32_t i = 0; i < sz; i++) { STqTopic* topic = taosArrayGet(pConsumer->topics, i); // TODO race condition @@ -316,7 +316,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } - vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, pTq->pVnode->vgId); + vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, + pTq->pVnode->vgId); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; @@ -326,7 +327,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO consumerEpoch = atomic_load_32(&pConsumer->epoch); if (consumerEpoch > reqEpoch) { - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", + consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); break; } SWalReadHead* pHead; @@ -334,10 +336,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, + pTq->pVnode->vgId, fetchOffset); break; } - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType); + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, + pTq->pVnode->vgId, fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -361,7 +365,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (taosArrayGetSize(pRes) == 0) { - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, + pReq->epoch, pTq->pVnode->vgId, fetchOffset); fetchOffset++; rsp.skipLogNum++; taosArrayDestroy(pRes); @@ -390,7 +395,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch); + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, + pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); return 0; @@ -421,7 +427,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch); + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, + pReq->epoch); /*}*/ return 0; @@ -432,7 +439,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) { terrno = TSDB_CODE_SUCCESS; tDecodeSMqMVRebReq(msg, &req); - vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId); + vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId, req.newConsumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); ASSERT(pConsumer); ASSERT(pConsumer->consumerId == req.oldConsumerId); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 74ec4e55a6..8a38b9ab48 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -18,14 +18,14 @@ #include "executorimpl.h" #include "planner.h" #include "tcompression.h" +#include "tdatablock.h" #include "tglobal.h" #include "tqueue.h" -#include "tdatablock.h" typedef struct SDataDispatchBuf { int32_t useSize; int32_t allocSize; - char* pData; + char* pData; } SDataDispatchBuf; typedef struct SDataCacheEntry { @@ -36,26 +36,25 @@ typedef struct SDataCacheEntry { } SDataCacheEntry; typedef struct SDataDispatchHandle { - SDataSinkHandle sink; - SDataSinkManager* pManager; + SDataSinkHandle sink; + SDataSinkManager* pManager; SDataBlockDescNode* pSchema; - STaosQueue* pDataBlocks; - SDataDispatchBuf nextOutput; - int32_t status; - bool queryEnd; - uint64_t useconds; - TdThreadMutex mutex; + STaosQueue* pDataBlocks; + SDataDispatchBuf nextOutput; + int32_t status; + bool queryEnd; + uint64_t useconds; + TdThreadMutex mutex; } SDataDispatchHandle; -static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSchema) { +static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { if (tsCompressColData < 0 || 0 == pData->info.rows) { return false; } - int32_t numOfCols = LIST_LENGTH(pSchema->pSlots); for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); - int32_t colSize = pColRes->info.bytes * pData->info.rows; + int32_t colSize = pColRes->info.bytes * pData->info.rows; if (NEEDTO_COMPRESS_QUERY(colSize)) { return true; } @@ -64,51 +63,6 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc return false; } -static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { - int32_t colSize = colDataGetLength(pColRes, numOfRows); - return (*(tDataTypes[pColRes->info.type].compFunc))( - pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); -} - -static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t * dataLen) { - int32_t numOfCols = LIST_LENGTH(pSchema->pSlots); - int32_t * colSizes = (int32_t*)data; - - data += numOfCols * sizeof(int32_t); - *dataLen = (numOfCols * sizeof(int32_t)); - - int32_t numOfRows = pInput->pData->info.rows; - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col); - - // copy the null bitmap - if (IS_VAR_DATA_TYPE(pColRes->info.type)) { - size_t metaSize = numOfRows * sizeof(int32_t); - memcpy(data, pColRes->varmeta.offset, metaSize); - data += metaSize; - (*dataLen) += metaSize; - } else { - int32_t len = BitmapLen(numOfRows); - memcpy(data, pColRes->nullbitmap, len); - data += len; - (*dataLen) += len; - } - - if (compressed) { - colSizes[col] = compressColData(pColRes, numOfRows, data, compressed); - data += colSizes[col]; - (*dataLen) += colSizes[col]; - } else { - colSizes[col] = colDataGetLength(pColRes, numOfRows); - (*dataLen) += colSizes[col]; - memmove(data, pColRes->pData, colSizes[col]); - data += colSizes[col]; - } - - colSizes[col] = htonl(colSizes[col]); - } -} - // data format: // +----------------+--------------------------------------+-------------+-----------+-------------+-----------+ // |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... @@ -117,16 +71,17 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema // The length of bitmap is decided by number of rows of this data block, and the length of each column data is // recorded in the first segment, next to the struct header static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { + int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); + SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; - pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema); - pEntry->numOfRows = pInput->pData->info.rows; - pEntry->dataLen = 0; + pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols); + pEntry->numOfRows = pInput->pData->info.rows; + pEntry->dataLen = 0; pBuf->useSize = sizeof(SRetrieveTableRsp); - copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen); + blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed); - pEntry->dataLen = pEntry->dataLen; - pBuf->useSize += pEntry->dataLen; + pBuf->useSize += pEntry->dataLen; } static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { @@ -140,7 +95,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, // NOTE: there are four bytes of an integer more than the required buffer space. // struct size + data payload + length for each column + bitmap length pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) + - ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows); + ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows); pBuf->pData = taosMemoryMalloc(pBuf->allocSize); if (pBuf->pData == NULL) { @@ -153,8 +108,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { taosThreadMutexLock(&pDispatcher->mutex); int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks); - int32_t status = (0 == blockNums ? DS_BUF_EMPTY : - (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); + int32_t status = + (0 == blockNums ? DS_BUF_EMPTY + : (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); pDispatcher->status = status; taosThreadMutexUnlock(&pDispatcher->mutex); return status; @@ -169,7 +125,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); + SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -200,7 +156,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; - *pQueryEnd = pDispatcher->queryEnd; + *pQueryEnd = pDispatcher->queryEnd; } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {