Merge pull request #9677 from taosdata/feature/3.0_wxy
TD-12760 datasink interface code
This commit is contained in:
commit
e5732c6d3c
|
@ -52,6 +52,8 @@ typedef SSchema SSlotSchema;
|
||||||
typedef struct SDataBlockSchema {
|
typedef struct SDataBlockSchema {
|
||||||
SSlotSchema *pSchema;
|
SSlotSchema *pSchema;
|
||||||
int32_t numOfCols; // number of columns
|
int32_t numOfCols; // number of columns
|
||||||
|
int32_t resultRowSize;
|
||||||
|
int16_t precision;
|
||||||
} SDataBlockSchema;
|
} SDataBlockSchema;
|
||||||
|
|
||||||
typedef struct SQueryNodeBasicInfo {
|
typedef struct SQueryNodeBasicInfo {
|
||||||
|
@ -61,6 +63,7 @@ typedef struct SQueryNodeBasicInfo {
|
||||||
|
|
||||||
typedef struct SDataSink {
|
typedef struct SDataSink {
|
||||||
SQueryNodeBasicInfo info;
|
SQueryNodeBasicInfo info;
|
||||||
|
SDataBlockSchema schema;
|
||||||
} SDataSink;
|
} SDataSink;
|
||||||
|
|
||||||
typedef struct SDataDispatcher {
|
typedef struct SDataDispatcher {
|
||||||
|
|
|
@ -8,5 +8,5 @@ target_include_directories(
|
||||||
|
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
executor
|
executor
|
||||||
PRIVATE os util common function parser
|
PRIVATE os util common function parser planner qcom
|
||||||
)
|
)
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _DATA_SINK_INT_H
|
||||||
|
#define _DATA_SINK_INT_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "common.h"
|
||||||
|
#include "dataSinkMgt.h"
|
||||||
|
|
||||||
|
struct SDataSink;
|
||||||
|
struct SDataSinkHandle;
|
||||||
|
|
||||||
|
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SDataResult* pRes);
|
||||||
|
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, char* pData, int32_t* pLen);
|
||||||
|
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
||||||
|
|
||||||
|
typedef struct SDataSinkHandle {
|
||||||
|
FPutDataBlock fPut;
|
||||||
|
FGetDataBlock fGet;
|
||||||
|
FDestroyDataSinker fDestroy;
|
||||||
|
} SDataSinkHandle;
|
||||||
|
|
||||||
|
int32_t createDataDispatcher(const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_DATA_SINK_INT_H*/
|
|
@ -0,0 +1,104 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _DATA_SINK_MGT_H
|
||||||
|
#define _DATA_SINK_MGT_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "executorimpl.h"
|
||||||
|
|
||||||
|
#define DS_CAPACITY_ENOUGH 1
|
||||||
|
#define DS_CAPACITY_FULL 2
|
||||||
|
#define DS_NEED_SCHEDULE 3
|
||||||
|
|
||||||
|
struct SDataSink;
|
||||||
|
struct SSDataBlock;
|
||||||
|
|
||||||
|
typedef struct SDataSinkMgtCfg {
|
||||||
|
uint32_t maxDataBlockNum;
|
||||||
|
uint32_t maxDataBlockNumPerQuery;
|
||||||
|
} SDataSinkMgtCfg;
|
||||||
|
|
||||||
|
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
|
||||||
|
|
||||||
|
typedef void* DataSinkHandle;
|
||||||
|
|
||||||
|
typedef struct SDataResult {
|
||||||
|
SQueryCostInfo profile;
|
||||||
|
const SSDataBlock* pData;
|
||||||
|
SHashObj* pTableRetrieveTsMap;
|
||||||
|
} SDataResult;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a subplan's datasinker handle for all later operations.
|
||||||
|
* @param pDataSink
|
||||||
|
* @param pHandle output
|
||||||
|
* @return error code
|
||||||
|
*/
|
||||||
|
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Put the result set returned by the executor into datasinker.
|
||||||
|
* @param handle
|
||||||
|
* @param pRes
|
||||||
|
* @return error code
|
||||||
|
*/
|
||||||
|
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the length of the data returned by the next call to dsGetDataBlock.
|
||||||
|
* @param handle
|
||||||
|
* @return data length
|
||||||
|
*/
|
||||||
|
int32_t dsGetDataLength(DataSinkHandle handle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get data, the caller needs to allocate data memory.
|
||||||
|
* @param handle
|
||||||
|
* @param pData output
|
||||||
|
* @param pLen output
|
||||||
|
* @return error code
|
||||||
|
*/
|
||||||
|
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call.
|
||||||
|
* @param handle
|
||||||
|
* @return datasinker status
|
||||||
|
*/
|
||||||
|
int32_t dsGetStatus(DataSinkHandle handle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
|
||||||
|
* @param ahandle
|
||||||
|
* @param pItem
|
||||||
|
*/
|
||||||
|
void dsScheduleProcess(void* ahandle, void* pItem);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destroy the datasinker handle.
|
||||||
|
* @param handle
|
||||||
|
*/
|
||||||
|
void dsDestroyDataSinker(DataSinkHandle handle);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_DATA_SINK_MGT_H*/
|
|
@ -0,0 +1,140 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "dataSinkInt.h"
|
||||||
|
#include "dataSinkMgt.h"
|
||||||
|
#include "planner.h"
|
||||||
|
#include "tcompression.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tqueue.h"
|
||||||
|
|
||||||
|
#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos
|
||||||
|
#define GET_BUF_REMAIN(buf) (buf)->remain
|
||||||
|
|
||||||
|
typedef struct SBuf {
|
||||||
|
int32_t size;
|
||||||
|
int32_t pos;
|
||||||
|
int32_t remain;
|
||||||
|
char* pData;
|
||||||
|
} SBuf;
|
||||||
|
|
||||||
|
typedef struct SDataDispatchHandle {
|
||||||
|
SDataSinkHandle sink;
|
||||||
|
SDataBlockSchema schema;
|
||||||
|
STaosQueue* pDataBlocks;
|
||||||
|
SBuf buf;
|
||||||
|
} SDataDispatchHandle;
|
||||||
|
|
||||||
|
static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) {
|
||||||
|
if (tsCompressColData < 0 || 0 == pData->info.rows) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
|
||||||
|
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
|
||||||
|
int32_t colSize = pColRes->info.bytes * pData->info.rows;
|
||||||
|
if (NEEDTO_COMPRESS_QUERY(colSize)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
|
||||||
|
int32_t colSize = pColRes->info.bytes * numOfRows;
|
||||||
|
return (*(tDataTypes[pColRes->info.type].compFunc))(
|
||||||
|
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doCopyQueryResultToMsg(const SDataResult* pRes, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
|
||||||
|
int32_t *compSizes = (int32_t*)data;
|
||||||
|
if (compressed) {
|
||||||
|
data += pSchema->numOfCols * sizeof(int32_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
|
||||||
|
SColumnInfoData* pColRes = taosArrayGet(pRes->pData->pDataBlock, col);
|
||||||
|
if (compressed) {
|
||||||
|
compSizes[col] = compressQueryColData(pColRes, pRes->pData->info.rows, data, compressed);
|
||||||
|
data += compSizes[col];
|
||||||
|
*compLen += compSizes[col];
|
||||||
|
compSizes[col] = htonl(compSizes[col]);
|
||||||
|
} else {
|
||||||
|
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->pData->info.rows);
|
||||||
|
data += pColRes->info.bytes * pRes->pData->info.rows;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfTables = (int32_t) taosHashGetSize(pRes->pTableRetrieveTsMap);
|
||||||
|
*(int32_t*)data = htonl(numOfTables);
|
||||||
|
data += sizeof(int32_t);
|
||||||
|
|
||||||
|
STableIdInfo* item = taosHashIterate(pRes->pTableRetrieveTsMap, NULL);
|
||||||
|
while (item) {
|
||||||
|
STableIdInfo* pDst = (STableIdInfo*)data;
|
||||||
|
pDst->uid = htobe64(item->uid);
|
||||||
|
pDst->key = htobe64(item->key);
|
||||||
|
data += sizeof(STableIdInfo);
|
||||||
|
item = taosHashIterate(pRes->pTableRetrieveTsMap, item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void toRetrieveResult(SDataDispatchHandle* pHandle, const SDataResult* pRes, char* pData, int32_t* pContLen) {
|
||||||
|
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pData;
|
||||||
|
pRsp->useconds = htobe64(pRes->profile.elapsedTime);
|
||||||
|
pRsp->precision = htons(pHandle->schema.precision);
|
||||||
|
pRsp->compressed = (int8_t)needCompress(pRes->pData, &(pHandle->schema));
|
||||||
|
pRsp->numOfRows = htonl(pRes->pData->info.rows);
|
||||||
|
|
||||||
|
*pContLen = sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(pRes->pTableRetrieveTsMap) + sizeof(SRetrieveTableRsp);
|
||||||
|
doCopyQueryResultToMsg(pRes, &pHandle->schema, pRsp->data, pRsp->compressed, &pRsp->compLen);
|
||||||
|
*pContLen += (pRsp->compressed ? pRsp->compLen : pHandle->schema.resultRowSize * pRes->pData->info.rows);
|
||||||
|
|
||||||
|
pRsp->compLen = htonl(pRsp->compLen);
|
||||||
|
// todo completed
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SDataResult* pRes) {
|
||||||
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
|
int32_t useSize = 0;
|
||||||
|
toRetrieveResult(pDispatcher, pRes, GET_BUF_DATA(&pDispatcher->buf), &useSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getDataBlock(SDataSinkHandle* pHandle, char* pData, int32_t* pLen) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t createDataDispatcher(const SDataSink* pDataSink, DataSinkHandle* pHandle) {
|
||||||
|
SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle));
|
||||||
|
if (NULL == dispatcher) {
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
dispatcher->sink.fPut = putDataBlock;
|
||||||
|
dispatcher->sink.fGet = getDataBlock;
|
||||||
|
dispatcher->sink.fDestroy = destroyDataSinker;
|
||||||
|
dispatcher->pDataBlocks = taosOpenQueue();
|
||||||
|
if (NULL == dispatcher->pDataBlocks) {
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
*pHandle = dispatcher;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
|
@ -0,0 +1,56 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "dataSinkMgt.h"
|
||||||
|
#include "dataSinkInt.h"
|
||||||
|
#include "planner.h"
|
||||||
|
|
||||||
|
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
|
||||||
|
// todo
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) {
|
||||||
|
if (DSINK_Dispatch == pDataSink->info.type) {
|
||||||
|
return createDataDispatcher(pDataSink, pHandle);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes) {
|
||||||
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
|
return pHandleImpl->fPut(pHandleImpl, pRes);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dsGetDataLength(DataSinkHandle handle) {
|
||||||
|
// todo
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen) {
|
||||||
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
|
return pHandleImpl->fGet(pHandleImpl, pData, pLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dsGetStatus(DataSinkHandle handle) {
|
||||||
|
// todo
|
||||||
|
}
|
||||||
|
|
||||||
|
void dsScheduleProcess(void* ahandle, void* pItem) {
|
||||||
|
// todo
|
||||||
|
}
|
||||||
|
|
||||||
|
void dsDestroyDataSinker(DataSinkHandle handle) {
|
||||||
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
|
pHandleImpl->fDestroy(pHandleImpl);
|
||||||
|
}
|
|
@ -624,7 +624,7 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
|
||||||
|
|
||||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
|
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pInfo = context.pOutput;
|
*pInfo = context.pOutput;
|
||||||
|
@ -637,5 +637,5 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
|
||||||
}
|
}
|
||||||
destroyInsertParseContext(&context);
|
destroyInsertParseContext(&context);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return (TSDB_CODE_SUCCESS == code ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED);
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue