236 lines
8.7 KiB
C
236 lines
8.7 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "dataSinkInt.h"
|
|
#include "dataSinkMgt.h"
|
|
#include "planner.h"
|
|
#include "tcompression.h"
|
|
#include "tglobal.h"
|
|
#include "tqueue.h"
|
|
#include "executorimpl.h"
|
|
|
|
typedef struct SDataDispatchBuf {
|
|
int32_t useSize;
|
|
int32_t allocSize;
|
|
char* pData;
|
|
} SDataDispatchBuf;
|
|
|
|
typedef struct SDataCacheEntry {
|
|
int32_t dataLen;
|
|
int32_t numOfRows;
|
|
int8_t compressed;
|
|
char data[];
|
|
} SDataCacheEntry;
|
|
|
|
typedef struct SDataDispatchHandle {
|
|
SDataSinkHandle sink;
|
|
SDataSinkManager* pManager;
|
|
SDataBlockSchema schema;
|
|
STaosQueue* pDataBlocks;
|
|
SDataDispatchBuf nextOutput;
|
|
int32_t status;
|
|
bool queryEnd;
|
|
uint64_t useconds;
|
|
pthread_mutex_t mutex;
|
|
} SDataDispatchHandle;
|
|
|
|
static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) {
|
|
if (tsCompressColData < 0 || 0 == pData->info.rows) {
|
|
return false;
|
|
}
|
|
|
|
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
|
|
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
|
|
int32_t colSize = pColRes->info.bytes * pData->info.rows;
|
|
if (NEEDTO_COMPRESS_QUERY(colSize)) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
|
|
int32_t colSize = pColRes->info.bytes * numOfRows;
|
|
return (*(tDataTypes[pColRes->info.type].compFunc))(
|
|
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
|
|
}
|
|
|
|
static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
|
|
int32_t *compSizes = (int32_t*)data;
|
|
if (compressed) {
|
|
data += pSchema->numOfCols * sizeof(int32_t);
|
|
}
|
|
|
|
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
|
|
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
|
|
if (compressed) {
|
|
compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed);
|
|
data += compSizes[col];
|
|
*compLen += compSizes[col];
|
|
compSizes[col] = htonl(compSizes[col]);
|
|
} else {
|
|
memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows);
|
|
data += pColRes->info.bytes * pInput->pData->info.rows;
|
|
}
|
|
}
|
|
}
|
|
|
|
// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
|
|
// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
|
|
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
|
|
pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema));
|
|
pEntry->numOfRows = pInput->pData->info.rows;
|
|
pEntry->dataLen = 0;
|
|
|
|
pBuf->useSize = sizeof(SRetrieveTableRsp);
|
|
copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
|
|
if (0 == pEntry->compressed) {
|
|
pEntry->dataLen = pHandle->schema.resultRowSize * pInput->pData->info.rows;
|
|
}
|
|
pBuf->useSize += pEntry->dataLen;
|
|
// todo completed
|
|
}
|
|
|
|
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
|
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
|
|
if (taosQueueSize(pDispatcher->pDataBlocks) > capacity) {
|
|
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
|
|
taosQueueSize(pDispatcher->pDataBlocks));
|
|
return false;
|
|
}
|
|
|
|
pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows;
|
|
pBuf->pData = malloc(pBuf->allocSize);
|
|
if (pBuf->pData == NULL) {
|
|
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
|
|
}
|
|
|
|
return NULL != pBuf->pData;
|
|
}
|
|
|
|
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
|
pthread_mutex_lock(&pDispatcher->mutex);
|
|
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
|
|
int32_t status = (0 == blockNums ? DS_BUF_EMPTY :
|
|
(blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
|
pDispatcher->status = status;
|
|
pthread_mutex_unlock(&pDispatcher->mutex);
|
|
return status;
|
|
}
|
|
|
|
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
|
|
pthread_mutex_lock(&pDispatcher->mutex);
|
|
int32_t status = pDispatcher->status;
|
|
pthread_mutex_unlock(&pDispatcher->mutex);
|
|
return status;
|
|
}
|
|
|
|
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
|
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
|
|
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
|
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
}
|
|
toDataCacheEntry(pDispatcher, pInput, pBuf);
|
|
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
|
|
*pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
|
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
|
pthread_mutex_lock(&pDispatcher->mutex);
|
|
pDispatcher->queryEnd = true;
|
|
pDispatcher->useconds = useconds;
|
|
pthread_mutex_unlock(&pDispatcher->mutex);
|
|
}
|
|
|
|
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
|
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
|
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
|
*pQueryEnd = pDispatcher->queryEnd;
|
|
*pLen = 0;
|
|
return;
|
|
}
|
|
SDataDispatchBuf* pBuf = NULL;
|
|
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
|
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
|
taosFreeQitem(pBuf);
|
|
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
|
|
*pQueryEnd = pDispatcher->queryEnd;
|
|
}
|
|
|
|
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
|
if (NULL == pDispatcher->nextOutput.pData) {
|
|
assert(pDispatcher->queryEnd);
|
|
pOutput->useconds = pDispatcher->useconds;
|
|
pOutput->precision = pDispatcher->schema.precision;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
|
|
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
|
pOutput->numOfRows = pEntry->numOfRows;
|
|
pOutput->compressed = pEntry->compressed;
|
|
tfree(pDispatcher->nextOutput.pData); // todo persistent
|
|
pOutput->bufStatus = updateStatus(pDispatcher);
|
|
pthread_mutex_lock(&pDispatcher->mutex);
|
|
pOutput->queryEnd = pDispatcher->queryEnd;
|
|
pOutput->useconds = pDispatcher->useconds;
|
|
pOutput->precision = pDispatcher->schema.precision;
|
|
pthread_mutex_unlock(&pDispatcher->mutex);
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
|
tfree(pDispatcher->nextOutput.pData);
|
|
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
|
SDataDispatchBuf* pBuf = NULL;
|
|
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
|
tfree(pBuf->pData);
|
|
taosFreeQitem(pBuf);
|
|
}
|
|
taosCloseQueue(pDispatcher->pDataBlocks);
|
|
pthread_mutex_destroy(&pDispatcher->mutex);
|
|
}
|
|
|
|
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataSink, DataSinkHandle* pHandle) {
|
|
SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle));
|
|
if (NULL == dispatcher) {
|
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
}
|
|
dispatcher->sink.fPut = putDataBlock;
|
|
dispatcher->sink.fEndPut = endPut;
|
|
dispatcher->sink.fGetLen = getDataLength;
|
|
dispatcher->sink.fGetData = getDataBlock;
|
|
dispatcher->sink.fDestroy = destroyDataSinker;
|
|
dispatcher->pManager = pManager;
|
|
dispatcher->schema = pDataSink->schema;
|
|
dispatcher->status = DS_BUF_EMPTY;
|
|
dispatcher->queryEnd = false;
|
|
dispatcher->pDataBlocks = taosOpenQueue();
|
|
pthread_mutex_init(&dispatcher->mutex, NULL);
|
|
if (NULL == dispatcher->pDataBlocks) {
|
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
}
|
|
*pHandle = dispatcher;
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|