Merge pull request #11347 from taosdata/feature/tq
extract compressed col data format
This commit is contained in:
commit
e2600ef86c
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_COMMON_EP_H_
|
#define _TD_COMMON_EP_H_
|
||||||
|
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
|
#include "tcompression.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -115,11 +116,11 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
|
||||||
|
|
||||||
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
|
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
for(int32_t i = start; i < start + nRows; ++i) {
|
for (int32_t i = start; i < start + nRows; ++i) {
|
||||||
pColumnInfoData->varmeta.offset[i] = -1; // it is a null value of VAR type.
|
pColumnInfoData->varmeta.offset[i] = -1; // it is a null value of VAR type.
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for(int32_t i = start; i < start + nRows; ++i) {
|
for (int32_t i = start; i < start + nRows; ++i) {
|
||||||
colDataSetNull_f(pColumnInfoData->nullbitmap, i);
|
colDataSetNull_f(pColumnInfoData->nullbitmap, i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -200,12 +201,58 @@ void blockDataCleanup(SSDataBlock* pDataBlock);
|
||||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
||||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n);
|
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
|
||||||
|
|
||||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
||||||
|
|
||||||
void blockDebugShowData(const SArray* dataBlocks);
|
void blockDebugShowData(const SArray* dataBlocks);
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
|
||||||
|
int8_t compressed) {
|
||||||
|
int32_t colSize = colDataGetLength(pColRes, numOfRows);
|
||||||
|
return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
|
||||||
|
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
|
||||||
|
int8_t needCompress) {
|
||||||
|
int32_t* colSizes = (int32_t*)data;
|
||||||
|
|
||||||
|
data += numOfCols * sizeof(int32_t);
|
||||||
|
*dataLen = (numOfCols * sizeof(int32_t));
|
||||||
|
|
||||||
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
|
for (int32_t col = 0; col < numOfCols; ++col) {
|
||||||
|
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
|
||||||
|
|
||||||
|
// copy the null bitmap
|
||||||
|
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
||||||
|
size_t metaSize = numOfRows * sizeof(int32_t);
|
||||||
|
memcpy(data, pColRes->varmeta.offset, metaSize);
|
||||||
|
data += metaSize;
|
||||||
|
(*dataLen) += metaSize;
|
||||||
|
} else {
|
||||||
|
int32_t len = BitmapLen(numOfRows);
|
||||||
|
memcpy(data, pColRes->nullbitmap, len);
|
||||||
|
data += len;
|
||||||
|
(*dataLen) += len;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (needCompress) {
|
||||||
|
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
|
||||||
|
data += colSizes[col];
|
||||||
|
(*dataLen) += colSizes[col];
|
||||||
|
} else {
|
||||||
|
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
||||||
|
(*dataLen) += colSizes[col];
|
||||||
|
memmove(data, pColRes->pData, colSizes[col]);
|
||||||
|
data += colSizes[col];
|
||||||
|
}
|
||||||
|
|
||||||
|
colSizes[col] = htonl(colSizes[col]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -296,7 +296,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STqTopic* pTopic = NULL;
|
STqTopic* pTopic = NULL;
|
||||||
int sz = taosArrayGetSize(pConsumer->topics);
|
int32_t sz = taosArrayGetSize(pConsumer->topics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
|
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
|
||||||
// TODO race condition
|
// TODO race condition
|
||||||
|
@ -316,7 +316,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, pTq->pVnode->vgId);
|
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
|
||||||
|
pTq->pVnode->vgId);
|
||||||
|
|
||||||
rsp.reqOffset = pReq->currentOffset;
|
rsp.reqOffset = pReq->currentOffset;
|
||||||
rsp.skipLogNum = 0;
|
rsp.skipLogNum = 0;
|
||||||
|
@ -326,7 +327,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
// TODO
|
// TODO
|
||||||
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
||||||
if (consumerEpoch > reqEpoch) {
|
if (consumerEpoch > reqEpoch) {
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
|
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
||||||
|
consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
SWalReadHead* pHead;
|
SWalReadHead* pHead;
|
||||||
|
@ -334,10 +336,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
// TODO: no more log, set timer to wait blocking time
|
// TODO: no more log, set timer to wait blocking time
|
||||||
// if data inserted during waiting, launch query and
|
// if data inserted during waiting, launch query and
|
||||||
// response to user
|
// response to user
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
||||||
|
pTq->pVnode->vgId, fetchOffset);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType);
|
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
||||||
|
pTq->pVnode->vgId, fetchOffset, pHead->msgType);
|
||||||
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
||||||
/*pHead = pTopic->pReadhandle->pHead;*/
|
/*pHead = pTopic->pReadhandle->pHead;*/
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||||
|
@ -361,7 +365,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) == 0) {
|
if (taosArrayGetSize(pRes) == 0) {
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
|
||||||
|
pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
||||||
fetchOffset++;
|
fetchOffset++;
|
||||||
rsp.skipLogNum++;
|
rsp.skipLogNum++;
|
||||||
taosArrayDestroy(pRes);
|
taosArrayDestroy(pRes);
|
||||||
|
@ -390,7 +395,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
pMsg->pCont = buf;
|
pMsg->pCont = buf;
|
||||||
pMsg->contLen = tlen;
|
pMsg->contLen = tlen;
|
||||||
pMsg->code = 0;
|
pMsg->code = 0;
|
||||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch);
|
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset,
|
||||||
|
pHead->msgType, consumerId, pReq->epoch);
|
||||||
tmsgSendRsp(pMsg);
|
tmsgSendRsp(pMsg);
|
||||||
taosMemoryFree(pHead);
|
taosMemoryFree(pHead);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -421,7 +427,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
pMsg->contLen = tlen;
|
pMsg->contLen = tlen;
|
||||||
pMsg->code = 0;
|
pMsg->code = 0;
|
||||||
tmsgSendRsp(pMsg);
|
tmsgSendRsp(pMsg);
|
||||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch);
|
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId,
|
||||||
|
pReq->epoch);
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -432,7 +439,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
tDecodeSMqMVRebReq(msg, &req);
|
tDecodeSMqMVRebReq(msg, &req);
|
||||||
|
|
||||||
vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId);
|
vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId, req.newConsumerId);
|
||||||
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
|
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
|
||||||
ASSERT(pConsumer);
|
ASSERT(pConsumer);
|
||||||
ASSERT(pConsumer->consumerId == req.oldConsumerId);
|
ASSERT(pConsumer->consumerId == req.oldConsumerId);
|
||||||
|
|
|
@ -18,9 +18,9 @@
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
#include "tcompression.h"
|
#include "tcompression.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "tdatablock.h"
|
|
||||||
|
|
||||||
typedef struct SDataDispatchBuf {
|
typedef struct SDataDispatchBuf {
|
||||||
int32_t useSize;
|
int32_t useSize;
|
||||||
|
@ -47,12 +47,11 @@ typedef struct SDataDispatchHandle {
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
} SDataDispatchHandle;
|
} SDataDispatchHandle;
|
||||||
|
|
||||||
static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSchema) {
|
static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
|
||||||
if (tsCompressColData < 0 || 0 == pData->info.rows) {
|
if (tsCompressColData < 0 || 0 == pData->info.rows) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
|
|
||||||
for (int32_t col = 0; col < numOfCols; ++col) {
|
for (int32_t col = 0; col < numOfCols; ++col) {
|
||||||
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
|
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
|
||||||
int32_t colSize = pColRes->info.bytes * pData->info.rows;
|
int32_t colSize = pColRes->info.bytes * pData->info.rows;
|
||||||
|
@ -64,51 +63,6 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
|
|
||||||
int32_t colSize = colDataGetLength(pColRes, numOfRows);
|
|
||||||
return (*(tDataTypes[pColRes->info.type].compFunc))(
|
|
||||||
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t * dataLen) {
|
|
||||||
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
|
|
||||||
int32_t * colSizes = (int32_t*)data;
|
|
||||||
|
|
||||||
data += numOfCols * sizeof(int32_t);
|
|
||||||
*dataLen = (numOfCols * sizeof(int32_t));
|
|
||||||
|
|
||||||
int32_t numOfRows = pInput->pData->info.rows;
|
|
||||||
for (int32_t col = 0; col < numOfCols; ++col) {
|
|
||||||
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
|
|
||||||
|
|
||||||
// copy the null bitmap
|
|
||||||
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
|
||||||
size_t metaSize = numOfRows * sizeof(int32_t);
|
|
||||||
memcpy(data, pColRes->varmeta.offset, metaSize);
|
|
||||||
data += metaSize;
|
|
||||||
(*dataLen) += metaSize;
|
|
||||||
} else {
|
|
||||||
int32_t len = BitmapLen(numOfRows);
|
|
||||||
memcpy(data, pColRes->nullbitmap, len);
|
|
||||||
data += len;
|
|
||||||
(*dataLen) += len;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (compressed) {
|
|
||||||
colSizes[col] = compressColData(pColRes, numOfRows, data, compressed);
|
|
||||||
data += colSizes[col];
|
|
||||||
(*dataLen) += colSizes[col];
|
|
||||||
} else {
|
|
||||||
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
|
||||||
(*dataLen) += colSizes[col];
|
|
||||||
memmove(data, pColRes->pData, colSizes[col]);
|
|
||||||
data += colSizes[col];
|
|
||||||
}
|
|
||||||
|
|
||||||
colSizes[col] = htonl(colSizes[col]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// data format:
|
// data format:
|
||||||
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
|
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||||
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
|
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
|
||||||
|
@ -117,15 +71,16 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
|
||||||
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
||||||
// recorded in the first segment, next to the struct header
|
// recorded in the first segment, next to the struct header
|
||||||
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||||
|
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
|
||||||
|
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
|
||||||
pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema);
|
pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols);
|
||||||
pEntry->numOfRows = pInput->pData->info.rows;
|
pEntry->numOfRows = pInput->pData->info.rows;
|
||||||
pEntry->dataLen = 0;
|
pEntry->dataLen = 0;
|
||||||
|
|
||||||
pBuf->useSize = sizeof(SRetrieveTableRsp);
|
pBuf->useSize = sizeof(SRetrieveTableRsp);
|
||||||
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
|
blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
|
||||||
|
|
||||||
pEntry->dataLen = pEntry->dataLen;
|
|
||||||
pBuf->useSize += pEntry->dataLen;
|
pBuf->useSize += pEntry->dataLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,8 +108,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
|
||||||
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
||||||
taosThreadMutexLock(&pDispatcher->mutex);
|
taosThreadMutexLock(&pDispatcher->mutex);
|
||||||
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
|
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
|
||||||
int32_t status = (0 == blockNums ? DS_BUF_EMPTY :
|
int32_t status =
|
||||||
(blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
(0 == blockNums ? DS_BUF_EMPTY
|
||||||
|
: (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
||||||
pDispatcher->status = status;
|
pDispatcher->status = status;
|
||||||
taosThreadMutexUnlock(&pDispatcher->mutex);
|
taosThreadMutexUnlock(&pDispatcher->mutex);
|
||||||
return status;
|
return status;
|
||||||
|
|
Loading…
Reference in New Issue