sync home and office
This commit is contained in:
parent
6f5f6896a5
commit
3f62f8a333
|
@ -22,6 +22,8 @@ extern "C" {
|
||||||
|
|
||||||
//======================================================================================
|
//======================================================================================
|
||||||
//begin API to taosd and qworker
|
//begin API to taosd and qworker
|
||||||
|
#define TSDB_UDF_MAX_COLUMNS 4
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
UDFC_CODE_STOPPING = -1,
|
UDFC_CODE_STOPPING = -1,
|
||||||
UDFC_CODE_RESTARTING = -2,
|
UDFC_CODE_RESTARTING = -2,
|
||||||
|
@ -49,15 +51,22 @@ enum {
|
||||||
TSDB_UDF_SCRIPT_LUA = 1,
|
TSDB_UDF_SCRIPT_LUA = 1,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct SUdfColumnMeta {
|
||||||
|
int16_t type;
|
||||||
|
int32_t bytes; // <0 var length, others fixed length bytes
|
||||||
|
uint8_t precision;
|
||||||
|
uint8_t scale;
|
||||||
|
} SUdfColumnMeta;
|
||||||
|
|
||||||
typedef struct SUdfInfo {
|
typedef struct SUdfInfo {
|
||||||
char *udfName; // function name
|
char *udfName; // function name
|
||||||
int32_t udfType; // scalar function or aggregate function
|
int32_t udfType; // scalar function or aggregate function
|
||||||
int8_t scriptType;
|
int8_t scriptType;
|
||||||
char *path;
|
char *path;
|
||||||
|
|
||||||
int8_t resType; // result type
|
// known info between qworker and udf
|
||||||
int16_t resBytes; // result byte
|
// struct SUdfColumnMeta resultMeta;
|
||||||
int32_t bufSize; //interbuf size
|
// int32_t bufSize; //interbuf size
|
||||||
|
|
||||||
} SUdfInfo;
|
} SUdfInfo;
|
||||||
|
|
||||||
|
@ -72,33 +81,50 @@ typedef void *UdfHandle;
|
||||||
int32_t setupUdf(SUdfInfo* udf, UdfHandle *handle);
|
int32_t setupUdf(SUdfInfo* udf, UdfHandle *handle);
|
||||||
|
|
||||||
|
|
||||||
enum {
|
typedef struct SUdfColumnData {
|
||||||
TSDB_UDF_STEP_NORMAL = 0,
|
int32_t numOfRows;
|
||||||
TSDB_UDF_STEP_MERGE,
|
bool varLengthColumn;
|
||||||
TSDb_UDF_STEP_FINALIZE,
|
union {
|
||||||
TSDB_UDF_STEP_MAX_NUM
|
int32_t nullBitmapLen;
|
||||||
};
|
char* nullBitmap;
|
||||||
/**
|
int32_t dataLen;
|
||||||
* call udf
|
char* data;
|
||||||
* @param handle udf handle
|
};
|
||||||
* @param step
|
|
||||||
* @param state
|
union {
|
||||||
* @param stateSize
|
int32_t varOffsetsLen;
|
||||||
* @param input
|
char* varOffsets;
|
||||||
* @param newstate
|
int32_t payloadLen;
|
||||||
* @param newStateSize
|
char* payload;
|
||||||
* @param output
|
};
|
||||||
* @return error code
|
} SUdfColumnData;
|
||||||
*/
|
|
||||||
|
|
||||||
|
typedef struct SUdfColumn {
|
||||||
|
SUdfColumnMeta colMeta;
|
||||||
|
SUdfColumnData colData;
|
||||||
|
} SUdfColumn;
|
||||||
|
|
||||||
//TODO: must change the following after metadata flow and data flow between qworker and udfd is well defined
|
|
||||||
typedef struct SUdfDataBlock {
|
typedef struct SUdfDataBlock {
|
||||||
char* data;
|
int32_t numOfRows;
|
||||||
int32_t size;
|
int32_t numOfCols;
|
||||||
|
SUdfColumn udfCols[TSDB_UDF_MAX_COLUMNS];
|
||||||
} SUdfDataBlock;
|
} SUdfDataBlock;
|
||||||
|
|
||||||
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newstate,
|
typedef struct SUdfInterBuf {
|
||||||
int32_t *newStateSize, SUdfDataBlock *output);
|
int32_t bufLen;
|
||||||
|
char* buf;
|
||||||
|
} SUdfInterBuf;
|
||||||
|
|
||||||
|
// input: block, initFirst
|
||||||
|
// output: interbuf
|
||||||
|
int32_t callUdfAggProcess(SUdfDataBlock block, SUdfInterBuf *interBuf, bool initFirst);
|
||||||
|
// input: interBuf
|
||||||
|
// output: resultData
|
||||||
|
int32_t callUdfAggFinalize(SUdfInterBuf interBuf, SUdfColumnData* resultData);
|
||||||
|
// input: block
|
||||||
|
// output: resultData
|
||||||
|
int32_t callUdfScalaProcess(SUdfDataBlock block, SUdfColumnData* resultData);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* tearn down udf
|
* tearn down udf
|
||||||
|
@ -109,30 +135,16 @@ int32_t teardownUdf(UdfHandle handle);
|
||||||
|
|
||||||
// end API to taosd and qworker
|
// end API to taosd and qworker
|
||||||
//=============================================================================================================================
|
//=============================================================================================================================
|
||||||
// TODO: Must change
|
|
||||||
// begin API to UDF writer.
|
// begin API to UDF writer.
|
||||||
|
|
||||||
// script
|
// dynamic lib init and destroy
|
||||||
|
|
||||||
//typedef int32_t (*scriptInitFunc)(void* pCtx);
|
|
||||||
//typedef void (*scriptNormalFunc)(void* pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows,
|
|
||||||
// int64_t* ptList, int64_t key, char* dataOutput, char* tsOutput, int32_t* numOfOutput,
|
|
||||||
// int16_t oType, int16_t oBytes);
|
|
||||||
//typedef void (*scriptFinalizeFunc)(void* pCtx, int64_t key, char* dataOutput, int32_t* numOfOutput);
|
|
||||||
//typedef void (*scriptMergeFunc)(void* pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput);
|
|
||||||
//typedef void (*scriptDestroyFunc)(void* pCtx);
|
|
||||||
|
|
||||||
// dynamic lib
|
|
||||||
typedef int32_t (*TUdfInitFunc)();
|
typedef int32_t (*TUdfInitFunc)();
|
||||||
typedef void (*TUdfDestroyFunc)();
|
typedef int32_t (*TUdfDestroyFunc)();
|
||||||
|
|
||||||
typedef void (*TUdfFunc)(int8_t step,
|
|
||||||
char *state, int32_t stateSize, SUdfDataBlock input,
|
|
||||||
char **newstate, int32_t *newStateSize, SUdfDataBlock *output);
|
|
||||||
|
|
||||||
//typedef void (*udfMergeFunc)(char *data, int32_t numOfRows, char *dataOutput, int32_t* numOfOutput);
|
|
||||||
//typedef void (*udfFinalizeFunc)(char* state, int32_t stateSize, SUdfDataBlock *output);
|
|
||||||
|
|
||||||
|
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumnData *resultData);
|
||||||
|
typedef int32_t (*TUdfAggInit)(SUdfInterBuf *buf);
|
||||||
|
typedef int32_t (*TUdfAggProcess)(SUdfDataBlock block, SUdfInterBuf *interBuf);
|
||||||
|
typedef int32_t (*TUdfAggFinalize)(SUdfInterBuf buf, SUdfColumnData *resultData);
|
||||||
// end API to UDF writer
|
// end API to UDF writer
|
||||||
//=======================================================================================================================
|
//=======================================================================================================================
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,12 @@ enum {
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
TSDB_UDF_CALL_AGG_PROC = 0,
|
||||||
|
TSDb_UDF_CALL_AGG_FIN,
|
||||||
|
TSDB_UDF_CALL_SCALA_PROC,
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct SUdfSetupRequest {
|
typedef struct SUdfSetupRequest {
|
||||||
char udfName[16]; //
|
char udfName[16]; //
|
||||||
int8_t scriptType; // 0:c, 1: lua, 2:js
|
int8_t scriptType; // 0:c, 1: lua, 2:js
|
||||||
|
@ -42,24 +48,18 @@ typedef struct SUdfSetupResponse {
|
||||||
int64_t udfHandle;
|
int64_t udfHandle;
|
||||||
} SUdfSetupResponse;
|
} SUdfSetupResponse;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SUdfCallRequest {
|
typedef struct SUdfCallRequest {
|
||||||
int64_t udfHandle;
|
int64_t udfHandle;
|
||||||
int8_t step;
|
int8_t callType;
|
||||||
|
|
||||||
int32_t inputBytes;
|
SUdfDataBlock block;
|
||||||
char *input;
|
SUdfInterBuf interBuf;
|
||||||
|
bool initFirst;
|
||||||
int32_t stateBytes;
|
|
||||||
char *state;
|
|
||||||
} SUdfCallRequest;
|
} SUdfCallRequest;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SUdfCallResponse {
|
typedef struct SUdfCallResponse {
|
||||||
int32_t outputBytes;
|
SUdfColumnData resultData;
|
||||||
char *output;
|
SUdfInterBuf interBuf;
|
||||||
int32_t newStateBytes;
|
|
||||||
char *newState;
|
|
||||||
} SUdfCallResponse;
|
} SUdfCallResponse;
|
||||||
|
|
||||||
|
|
||||||
|
@ -76,7 +76,11 @@ typedef struct SUdfRequest {
|
||||||
int64_t seqNum;
|
int64_t seqNum;
|
||||||
|
|
||||||
int8_t type;
|
int8_t type;
|
||||||
void *subReq;
|
union {
|
||||||
|
SUdfSetupRequest setup;
|
||||||
|
SUdfCallRequest call;
|
||||||
|
SUdfTeardownRequest teardown;
|
||||||
|
};
|
||||||
} SUdfRequest;
|
} SUdfRequest;
|
||||||
|
|
||||||
typedef struct SUdfResponse {
|
typedef struct SUdfResponse {
|
||||||
|
@ -85,13 +89,17 @@ typedef struct SUdfResponse {
|
||||||
|
|
||||||
int8_t type;
|
int8_t type;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
void *subRsp;
|
union {
|
||||||
|
SUdfSetupResponse setupRsp;
|
||||||
|
SUdfCallResponse callRsp;
|
||||||
|
SUdfTeardownResponse teardownRsp;
|
||||||
|
};
|
||||||
} SUdfResponse;
|
} SUdfResponse;
|
||||||
|
|
||||||
int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest);
|
int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest *pRequest);
|
||||||
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response);
|
|
||||||
int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
|
int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
|
||||||
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse **pResponse);
|
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse *pResponse);
|
||||||
|
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,10 +20,16 @@
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
|
||||||
//TODO: when startup, set thread poll size. add it to cfg
|
//TODO: when startup, set thread poll size. add it to cfg
|
||||||
|
//TODO: test for udfd restart
|
||||||
//TODO: udfd restart when exist or aborts
|
//TODO: udfd restart when exist or aborts
|
||||||
|
//TODO: deal with uv task that has been started and then udfd core dumped
|
||||||
//TODO: network error processing.
|
//TODO: network error processing.
|
||||||
//TODO: add unit test
|
//TODO: add unit test
|
||||||
//TODO: test libuv queue
|
//TODO: include all global variable under context struct
|
||||||
|
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
|
||||||
|
* The QUEUE is copied from queue.h under libuv
|
||||||
|
* */
|
||||||
|
|
||||||
typedef void *QUEUE[2];
|
typedef void *QUEUE[2];
|
||||||
|
|
||||||
/* Private macros. */
|
/* Private macros. */
|
||||||
|
@ -205,104 +211,331 @@ int8_t gUdfcState = UDFC_STATE_INITAL;
|
||||||
|
|
||||||
QUEUE gUdfTaskQueue = {0};
|
QUEUE gUdfTaskQueue = {0};
|
||||||
|
|
||||||
//TODO: deal with uv task that has been started and then udfd core dumped
|
|
||||||
QUEUE gUvProcTaskQueue = {0};
|
QUEUE gUvProcTaskQueue = {0};
|
||||||
|
|
||||||
int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
|
int32_t serializeUdfColumnData(SUdfColumnData* data, char* pBuf) {
|
||||||
debugPrint("%s", "encoding request");
|
char* bufBeg = pBuf;
|
||||||
|
*(int32_t*)pBuf = data->numOfRows;
|
||||||
|
pBuf += sizeof(int32_t);
|
||||||
|
*(bool*)pBuf = data->varLengthColumn;
|
||||||
|
pBuf += sizeof(bool);
|
||||||
|
|
||||||
int len = sizeof(SUdfRequest) - sizeof(void *);
|
if (!data->varLengthColumn) {
|
||||||
switch (request->type) {
|
*(int32_t*)pBuf = data->nullBitmapLen;
|
||||||
case UDF_TASK_SETUP: {
|
pBuf += sizeof(int32_t);
|
||||||
SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq);
|
memcpy(pBuf, data->nullBitmap, data->nullBitmapLen);
|
||||||
len += sizeof(SUdfSetupRequest) - 1 * sizeof(char *) + setup->pathSize;
|
pBuf += data->nullBitmapLen;
|
||||||
break;
|
*(int32_t*)pBuf = data->dataLen;
|
||||||
}
|
pBuf += sizeof(int32_t);
|
||||||
case UDF_TASK_CALL: {
|
memcpy(pBuf, data->data, data->dataLen);
|
||||||
SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
|
pBuf += data->dataLen;
|
||||||
len += sizeof(SUdfCallRequest) - 2 * sizeof(char *) + call->inputBytes + call->stateBytes;
|
} else {
|
||||||
break;
|
*(int32_t*)pBuf = data->varOffsetsLen;
|
||||||
}
|
pBuf += sizeof(int32_t);
|
||||||
case UDF_TASK_TEARDOWN: {
|
memcpy(pBuf, data->varOffsets, data->varOffsetsLen);
|
||||||
SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
|
pBuf += data->varOffsetsLen;
|
||||||
len += sizeof(SUdfTeardownRequest);
|
*(int32_t*)pBuf = data->payloadLen;
|
||||||
break;
|
pBuf += sizeof(int32_t);
|
||||||
}
|
memcpy(pBuf, data->payload, data->payloadLen);
|
||||||
default:
|
pBuf += data->payloadLen;
|
||||||
break;
|
}
|
||||||
|
int32_t len = pBuf - bufBeg;
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t deserializeUdfColumnData(SUdfColumnData* pData, char* buf) {
|
||||||
|
char* bufBeg = buf;
|
||||||
|
pData->numOfRows = *(int32_t*)buf;
|
||||||
|
buf += sizeof(int32_t);
|
||||||
|
pData->varLengthColumn = *(bool*)buf;
|
||||||
|
buf += sizeof(bool);
|
||||||
|
|
||||||
|
if (!pData->varLengthColumn) {
|
||||||
|
pData->nullBitmapLen = *(int32_t*)buf;
|
||||||
|
buf += sizeof(int32_t);
|
||||||
|
|
||||||
|
pData->nullBitmap = buf;
|
||||||
|
buf += pData->nullBitmapLen;
|
||||||
|
|
||||||
|
pData->dataLen = *(int32_t*)buf;
|
||||||
|
buf += sizeof(int32_t);
|
||||||
|
|
||||||
|
pData->data = buf;
|
||||||
|
buf += pData->dataLen;
|
||||||
|
} else {
|
||||||
|
pData->varOffsetsLen = *(int32_t*)buf;
|
||||||
|
buf += sizeof(int32_t);
|
||||||
|
|
||||||
|
pData->varOffsets = buf;
|
||||||
|
buf += pData->varOffsetsLen;
|
||||||
|
|
||||||
|
pData->payloadLen = *(int32_t*)buf;
|
||||||
|
buf += sizeof(int32_t);
|
||||||
|
|
||||||
|
pData->payload = buf;
|
||||||
|
buf += pData->payloadLen;
|
||||||
|
}
|
||||||
|
int32_t len = buf - bufBeg;
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t serializeUdfColumnMeta(SUdfColumnMeta *meta, char* pBuf) {
|
||||||
|
char* bufBeg = pBuf;
|
||||||
|
memcpy(pBuf, meta, sizeof(SUdfColumnMeta));
|
||||||
|
pBuf += sizeof(SUdfColumnMeta);
|
||||||
|
|
||||||
|
int32_t len = pBuf - bufBeg;
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t deserializeUdfColumnMeta(SUdfColumnMeta *pMeta, char* buf) {
|
||||||
|
char *bufBegin = buf;
|
||||||
|
memcpy(pMeta, buf, sizeof(SUdfColumnMeta));
|
||||||
|
buf += sizeof(SUdfColumnMeta);
|
||||||
|
|
||||||
|
int32_t len = buf - bufBegin;
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t serializeUdfColumn(SUdfColumn *udfCol, char *pBuf) {
|
||||||
|
char *bufBegin = pBuf;
|
||||||
|
|
||||||
|
int32_t len = serializeUdfColumnMeta(&udfCol->colMeta, pBuf);
|
||||||
|
pBuf += len;
|
||||||
|
|
||||||
|
len = serializeUdfColumnData(&udfCol->colData, pBuf);
|
||||||
|
pBuf += len;
|
||||||
|
|
||||||
|
int32_t totalLen = pBuf - bufBegin;
|
||||||
|
return totalLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t deserializeUdfColumn(SUdfColumn *pUdfCol, char *buf) {
|
||||||
|
char *bufBegin = buf;
|
||||||
|
|
||||||
|
int32_t len = deserializeUdfColumnMeta(&pUdfCol->colMeta, buf);
|
||||||
|
buf += len;
|
||||||
|
|
||||||
|
len = deserializeUdfColumnData(&pUdfCol->colData, buf);
|
||||||
|
buf += len;
|
||||||
|
|
||||||
|
int32_t totalLen = buf - bufBegin;
|
||||||
|
return totalLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t serializeUdfDataBlock(SUdfDataBlock *block, char *pBuf) {
|
||||||
|
char *bufBegin = pBuf;
|
||||||
|
|
||||||
|
*(int32_t*)pBuf = block->numOfRows;
|
||||||
|
pBuf += sizeof(int32_t);
|
||||||
|
|
||||||
|
*(int32_t*)pBuf = block->numOfCols;
|
||||||
|
pBuf += sizeof(int32_t);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||||
|
SUdfColumn* col = block->udfCols + i;
|
||||||
|
int32_t l = serializeUdfColumn(col, pBuf);
|
||||||
|
pBuf += l;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *bufBegin = taosMemoryMalloc(len);
|
int32_t totalLen = pBuf - bufBegin;
|
||||||
char *buf = bufBegin;
|
return totalLen;
|
||||||
|
}
|
||||||
|
|
||||||
//skip msgLen first
|
int32_t deserailizeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) {
|
||||||
|
char *bufBegin = buf;
|
||||||
|
|
||||||
|
pBlock->numOfRows = *(int32_t*)buf;
|
||||||
buf += sizeof(int32_t);
|
buf += sizeof(int32_t);
|
||||||
|
|
||||||
*(int64_t *) buf = request->seqNum;
|
pBlock->numOfCols = *(int32_t*)buf;
|
||||||
buf += sizeof(int64_t);
|
buf += sizeof(int32_t);
|
||||||
*(int8_t *) buf = request->type;
|
|
||||||
buf += sizeof(int8_t);
|
|
||||||
|
|
||||||
switch (request->type) {
|
for (int32_t i = 0; i < pBlock->numOfCols; ++i) {
|
||||||
case UDF_TASK_SETUP: {
|
int32_t l = deserializeUdfColumn(pBlock->udfCols + i, buf);
|
||||||
SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq);
|
buf += l;
|
||||||
memcpy(buf, setup->udfName, 16);
|
|
||||||
buf += 16;
|
|
||||||
*(int8_t *) buf = setup->scriptType;
|
|
||||||
buf += sizeof(int8_t);
|
|
||||||
*(int8_t *) buf = setup->udfType;
|
|
||||||
buf += sizeof(int8_t);
|
|
||||||
*(int16_t *) buf = setup->pathSize;
|
|
||||||
buf += sizeof(int16_t);
|
|
||||||
memcpy(buf, setup->path, setup->pathSize);
|
|
||||||
buf += setup->pathSize;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case UDF_TASK_CALL: {
|
|
||||||
SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
|
|
||||||
*(int64_t *) buf = call->udfHandle;
|
|
||||||
buf += sizeof(int64_t);
|
|
||||||
*(int8_t *) buf = call->step;
|
|
||||||
buf += sizeof(int8_t);
|
|
||||||
*(int32_t *) buf = call->inputBytes;
|
|
||||||
buf += sizeof(int32_t);
|
|
||||||
memcpy(buf, call->input, call->inputBytes);
|
|
||||||
buf += call->inputBytes;
|
|
||||||
*(int32_t *) buf = call->stateBytes;
|
|
||||||
buf += sizeof(int32_t);
|
|
||||||
memcpy(buf, call->state, call->stateBytes);
|
|
||||||
buf += call->stateBytes;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case UDF_TASK_TEARDOWN: {
|
|
||||||
SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
|
|
||||||
*(int64_t *) buf = teardown->udfHandle;
|
|
||||||
buf += sizeof(int64_t);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
request->msgLen = buf - bufBegin;
|
int32_t totalLen = buf - bufBegin;
|
||||||
*(int32_t *) bufBegin = request->msgLen;
|
return totalLen;
|
||||||
*pBuf = bufBegin;
|
}
|
||||||
*pBufLen = request->msgLen;
|
|
||||||
debugPrint("\tLen: estimate: %d, actual:%d", len, *pBufLen);
|
int32_t serializeUdfInterBuf(SUdfInterBuf *state, char *pBuf) {
|
||||||
|
char *bufBegin = pBuf;
|
||||||
|
|
||||||
|
*(int32_t*)pBuf = state->bufLen;
|
||||||
|
pBuf += sizeof(int32_t);
|
||||||
|
|
||||||
|
memcpy(pBuf, state->buf, state->bufLen);
|
||||||
|
pBuf += state->bufLen;
|
||||||
|
|
||||||
|
return pBuf-bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t deserializeUdfInterBuf(SUdfInterBuf *pState, char *buf) {
|
||||||
|
char* bufBegin = buf;
|
||||||
|
pState->bufLen = *(int32_t*)buf;
|
||||||
|
buf += sizeof(int32_t);
|
||||||
|
pState->buf = buf;
|
||||||
|
buf += pState->bufLen;
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t serializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) {
|
||||||
|
char *bufBegin = buf;
|
||||||
|
|
||||||
|
memcpy(buf, setup->udfName, 16);
|
||||||
|
buf += 16;
|
||||||
|
*(int8_t *) buf = setup->scriptType;
|
||||||
|
buf += sizeof(int8_t);
|
||||||
|
*(int8_t *) buf = setup->udfType;
|
||||||
|
buf += sizeof(int8_t);
|
||||||
|
*(int16_t *) buf = setup->pathSize;
|
||||||
|
buf += sizeof(int16_t);
|
||||||
|
memcpy(buf, setup->path, setup->pathSize);
|
||||||
|
buf += setup->pathSize;
|
||||||
|
|
||||||
|
return buf - bufBegin;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t deserializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) {
|
||||||
|
char* bufBegin = buf;
|
||||||
|
|
||||||
|
memcpy(setup->udfName, buf, 16);
|
||||||
|
buf += 16;
|
||||||
|
setup->scriptType = *(int8_t *) buf;
|
||||||
|
buf += sizeof(int8_t);
|
||||||
|
setup->udfType = *(int8_t *) buf;
|
||||||
|
buf += sizeof(int8_t);
|
||||||
|
setup->pathSize = *(int16_t *) buf;
|
||||||
|
buf += sizeof(int16_t);
|
||||||
|
setup->path = buf;
|
||||||
|
buf += setup->pathSize;
|
||||||
|
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t serializeUdfTeardownRequest(SUdfTeardownRequest *teardown, char *buf) {
|
||||||
|
char* bufBegin = buf;
|
||||||
|
*(int64_t *) buf = teardown->udfHandle;
|
||||||
|
buf += sizeof(int64_t);
|
||||||
|
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t deserializeUdfTeardownRequest(SUdfTeardownRequest *teardown, char *buf) {
|
||||||
|
char *bufBegin = buf;
|
||||||
|
teardown->udfHandle = *(int64_t *) buf;
|
||||||
|
buf += sizeof(int64_t);
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t serializeUdfSetupResponse(SUdfSetupResponse *setupRsp, char *buf) {
|
||||||
|
char *bufBegin = buf;
|
||||||
|
|
||||||
|
*(int64_t *) buf = setupRsp->udfHandle;
|
||||||
|
buf += sizeof(int64_t);
|
||||||
|
|
||||||
|
return buf-bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t deserializeUdfSetupResponse(SUdfSetupResponse *setupRsp, char *buf) {
|
||||||
|
char *bufBegin = buf;
|
||||||
|
setupRsp->udfHandle = *(int64_t *) buf;
|
||||||
|
buf += sizeof(int64_t);
|
||||||
|
return buf-bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t serializeUdfTeardownResponse(SUdfTeardownResponse *teardownRsp, char *buf) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
|
int32_t deserializeUdfTeardownResponse(SUdfTeardownResponse *teardownRsp, char *buf) {
|
||||||
debugPrint("%s", "decoding request");
|
return 0;
|
||||||
if (*(int32_t *) bufMsg != bufLen) {
|
}
|
||||||
debugPrint("%s", "decoding request error");
|
|
||||||
return -1;
|
int32_t serializeUdfCallRequest(SUdfCallRequest *call, char *buf) {
|
||||||
}
|
char* bufBegin = buf;
|
||||||
char *buf = bufMsg;
|
*(int64_t *) buf = call->udfHandle;
|
||||||
SUdfRequest *request = taosMemoryMalloc(sizeof(SUdfRequest));
|
buf += sizeof(int64_t);
|
||||||
request->subReq = NULL;
|
*(int8_t *) buf = call->callType;
|
||||||
|
buf += sizeof(int8_t);
|
||||||
|
int32_t l = 0;
|
||||||
|
l = serializeUdfDataBlock(&call->block, buf);
|
||||||
|
buf += l;
|
||||||
|
l = serializeUdfInterBuf(&call->interBuf, buf);
|
||||||
|
buf += l;
|
||||||
|
|
||||||
|
*(bool*)buf = call->initFirst;
|
||||||
|
buf += sizeof(bool);
|
||||||
|
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t deserializeUdfCallRequest(SUdfCallRequest *call, char *buf) {
|
||||||
|
char* bufBegin = buf;
|
||||||
|
call->udfHandle = *(int64_t *) buf;
|
||||||
|
buf += sizeof(int64_t);
|
||||||
|
call->callType = *(int8_t *) buf;
|
||||||
|
buf += sizeof(int8_t);
|
||||||
|
int32_t l = 0;
|
||||||
|
l = deserailizeUdfDataBlock(&call->block, buf);
|
||||||
|
buf += l;
|
||||||
|
l = deserializeUdfInterBuf(&call->interBuf, buf);
|
||||||
|
buf += l;
|
||||||
|
call->initFirst = *(bool*)buf;
|
||||||
|
buf += sizeof(bool);
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t serializeUdfCallResponse(SUdfCallResponse *callRsp, char * buf) {
|
||||||
|
char *bufBegin = buf;
|
||||||
|
int32_t l = 0;
|
||||||
|
l = serializeUdfColumnData(&callRsp->resultData, buf);
|
||||||
|
buf += l;
|
||||||
|
l = serializeUdfInterBuf(&callRsp->interBuf, buf);
|
||||||
|
buf += l;
|
||||||
|
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t deserializeUdfCallResponse(SUdfCallResponse *callRsp, char * buf) {
|
||||||
|
char *bufBegin = buf;
|
||||||
|
int32_t l = 0;
|
||||||
|
l =deserializeUdfColumnData(&callRsp->resultData, buf);
|
||||||
|
buf += l;
|
||||||
|
l = deserializeUdfInterBuf(&callRsp->interBuf, buf);
|
||||||
|
buf += l;
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t serializeUdfRequest(SUdfRequest *request, char *buf) {
|
||||||
|
char* bufBegin = buf;
|
||||||
|
//skip msglen first
|
||||||
|
buf += sizeof(int32_t);
|
||||||
|
|
||||||
|
*(int64_t *) buf = request->seqNum;
|
||||||
|
buf += sizeof(int64_t);
|
||||||
|
*(int8_t *) buf = request->type;
|
||||||
|
buf += sizeof(int8_t);
|
||||||
|
|
||||||
|
int32_t l = 0;
|
||||||
|
if (request->type == UDF_TASK_SETUP) {
|
||||||
|
l = serializeUdfSetupRequest(&request->setup, buf);
|
||||||
|
buf += l;
|
||||||
|
} else if (request->type == UDF_TASK_CALL) {
|
||||||
|
l = serializeUdfCallRequest(&request->call, buf);
|
||||||
|
buf += l;
|
||||||
|
} else if (request->type == UDF_TASK_TEARDOWN){
|
||||||
|
l = serializeUdfTeardownRequest(&request->teardown, buf);
|
||||||
|
buf += l;
|
||||||
|
}
|
||||||
|
*(int32_t*)bufBegin = buf - bufBegin;
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t deserializeUdfRequest(SUdfRequest *request, char *buf) {
|
||||||
|
char* bufBegin = buf;
|
||||||
request->msgLen = *(int32_t *) (buf);
|
request->msgLen = *(int32_t *) (buf);
|
||||||
buf += sizeof(int32_t);
|
buf += sizeof(int32_t);
|
||||||
request->seqNum = *(int64_t *) (buf);
|
request->seqNum = *(int64_t *) (buf);
|
||||||
|
@ -310,89 +543,27 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
|
||||||
request->type = *(int8_t *) (buf);
|
request->type = *(int8_t *) (buf);
|
||||||
buf += sizeof(int8_t);
|
buf += sizeof(int8_t);
|
||||||
|
|
||||||
switch (request->type) {
|
int32_t l = 0;
|
||||||
case UDF_TASK_SETUP: {
|
if (request->type == UDF_TASK_SETUP) {
|
||||||
SUdfSetupRequest *setup = taosMemoryMalloc(sizeof(SUdfSetupRequest));
|
l = deserializeUdfSetupRequest(&request->setup, buf);
|
||||||
|
buf += l;
|
||||||
memcpy(setup->udfName, buf, 16);
|
} else if (request->type == UDF_TASK_CALL) {
|
||||||
buf += 16;
|
l = deserializeUdfCallRequest(&request->call, buf);
|
||||||
setup->scriptType = *(int8_t *) buf;
|
buf += l;
|
||||||
buf += sizeof(int8_t);
|
} else if (request->type == UDF_TASK_TEARDOWN){
|
||||||
setup->udfType = *(int8_t *) buf;
|
l = deserializeUdfTeardownRequest(&request->teardown, buf);
|
||||||
buf += sizeof(int8_t);
|
buf += l;
|
||||||
setup->pathSize = *(int16_t *) buf;
|
|
||||||
buf += sizeof(int16_t);
|
|
||||||
setup->path = buf;
|
|
||||||
buf += setup->pathSize;
|
|
||||||
|
|
||||||
request->subReq = setup;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case UDF_TASK_CALL: {
|
|
||||||
SUdfCallRequest *call = taosMemoryMalloc(sizeof(SUdfCallRequest));
|
|
||||||
|
|
||||||
call->udfHandle = *(int64_t *) buf;
|
|
||||||
buf += sizeof(int64_t);
|
|
||||||
call->step = *(int8_t *) buf;
|
|
||||||
buf += sizeof(int8_t);
|
|
||||||
call->inputBytes = *(int32_t *) buf;
|
|
||||||
buf += sizeof(int32_t);
|
|
||||||
call->input = buf;
|
|
||||||
buf += call->inputBytes;
|
|
||||||
call->stateBytes = *(int32_t *) buf;
|
|
||||||
buf += sizeof(int32_t);
|
|
||||||
call->state = buf;
|
|
||||||
buf += call->stateBytes;
|
|
||||||
|
|
||||||
request->subReq = call;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case UDF_TASK_TEARDOWN: {
|
|
||||||
SUdfTeardownRequest *teardown = taosMemoryMalloc(sizeof(SUdfTeardownRequest));
|
|
||||||
|
|
||||||
teardown->udfHandle = *(int64_t *) buf;
|
|
||||||
buf += sizeof(int64_t);
|
|
||||||
|
|
||||||
request->subReq = teardown;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
if (buf - bufMsg != bufLen) {
|
int32_t totalLen = buf-bufBegin;
|
||||||
debugPrint("%s", "decode request error");
|
if (totalLen != request->msgLen) {
|
||||||
taosMemoryFree(request->subReq);
|
debugPrint("decoding request error");
|
||||||
taosMemoryFree(request);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*pRequest = request;
|
return buf - bufBegin;
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
|
int32_t serializeUdfResponse(SUdfResponse *response, char *buf) {
|
||||||
debugPrint("%s", "encoding response");
|
char* bufBegin = buf;
|
||||||
|
|
||||||
int32_t len = sizeof(SUdfResponse) - sizeof(void *);
|
|
||||||
|
|
||||||
switch (response->type) {
|
|
||||||
case UDF_TASK_SETUP: {
|
|
||||||
len += sizeof(SUdfSetupResponse);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case UDF_TASK_CALL: {
|
|
||||||
SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
|
|
||||||
len += sizeof(SUdfCallResponse) - 2 * sizeof(char *) +
|
|
||||||
callResp->outputBytes + callResp->newStateBytes;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case UDF_TASK_TEARDOWN: {
|
|
||||||
len += sizeof(SUdfTeardownResponse);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
char *bufBegin = taosMemoryMalloc(len);
|
|
||||||
char *buf = bufBegin;
|
|
||||||
|
|
||||||
//skip msgLen
|
//skip msgLen
|
||||||
buf += sizeof(int32_t);
|
buf += sizeof(int32_t);
|
||||||
|
|
||||||
|
@ -402,51 +573,30 @@ int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
|
||||||
buf += sizeof(int8_t);
|
buf += sizeof(int8_t);
|
||||||
*(int32_t *) buf = response->code;
|
*(int32_t *) buf = response->code;
|
||||||
buf += sizeof(int32_t);
|
buf += sizeof(int32_t);
|
||||||
|
int32_t l = 0;
|
||||||
|
|
||||||
switch (response->type) {
|
switch (response->type) {
|
||||||
case UDF_TASK_SETUP: {
|
case UDF_TASK_SETUP: {
|
||||||
SUdfSetupResponse *setupResp = (SUdfSetupResponse *) (response->subRsp);
|
l = serializeUdfSetupResponse(&response->setupRsp, buf);
|
||||||
*(int64_t *) buf = setupResp->udfHandle;
|
buf += l;
|
||||||
buf += sizeof(int64_t);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case UDF_TASK_CALL: {
|
case UDF_TASK_CALL: {
|
||||||
SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
|
l = serializeUdfCallResponse(&response->callRsp, buf);
|
||||||
*(int32_t *) buf = callResp->outputBytes;
|
buf += l;
|
||||||
buf += sizeof(int32_t);
|
|
||||||
memcpy(buf, callResp->output, callResp->outputBytes);
|
|
||||||
buf += callResp->outputBytes;
|
|
||||||
|
|
||||||
*(int32_t *) buf = callResp->newStateBytes;
|
|
||||||
buf += sizeof(int32_t);
|
|
||||||
memcpy(buf, callResp->newState, callResp->newStateBytes);
|
|
||||||
buf += callResp->newStateBytes;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case UDF_TASK_TEARDOWN: {
|
case UDF_TASK_TEARDOWN: {
|
||||||
SUdfTeardownResponse *teardownResp = (SUdfTeardownResponse *) (response->subRsp);
|
l = serializeUdfTeardownResponse(&response->teardownRsp, buf);
|
||||||
break;
|
buf += l;
|
||||||
}
|
}
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
response->msgLen = buf - bufBegin;
|
|
||||||
*(int32_t *) bufBegin = response->msgLen;
|
*(int32_t*)bufBegin = buf - bufBegin;
|
||||||
*pBuf = bufBegin;
|
return buf - bufBegin;
|
||||||
*pBufLen = response->msgLen;
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
|
int32_t deserializeUdfResponse(SUdfResponse *rsp, char *buf) {
|
||||||
debugPrint("%s", "decoding response");
|
char* bufBegin = buf;
|
||||||
|
|
||||||
if (*(int32_t *) bufMsg != bufLen) {
|
|
||||||
debugPrint("%s", "can not decode response");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
char *buf = bufMsg;
|
|
||||||
SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse));
|
|
||||||
rsp->msgLen = *(int32_t *) buf;
|
rsp->msgLen = *(int32_t *) buf;
|
||||||
buf += sizeof(int32_t);
|
buf += sizeof(int32_t);
|
||||||
rsp->seqNum = *(int64_t *) buf;
|
rsp->seqNum = *(int64_t *) buf;
|
||||||
|
@ -455,47 +605,115 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
|
||||||
buf += sizeof(int8_t);
|
buf += sizeof(int8_t);
|
||||||
rsp->code = *(int32_t *) buf;
|
rsp->code = *(int32_t *) buf;
|
||||||
buf += sizeof(int32_t);
|
buf += sizeof(int32_t);
|
||||||
|
int32_t l = 0;
|
||||||
switch (rsp->type) {
|
switch (rsp->type) {
|
||||||
case UDF_TASK_SETUP: {
|
case UDF_TASK_SETUP: {
|
||||||
SUdfSetupResponse *setupRsp = (SUdfSetupResponse *) taosMemoryMalloc(sizeof(SUdfSetupResponse));
|
l = deserializeUdfSetupResponse(&rsp->setupRsp, buf);
|
||||||
setupRsp->udfHandle = *(int64_t *) buf;
|
buf += l;
|
||||||
buf += sizeof(int64_t);
|
|
||||||
rsp->subRsp = (char *) setupRsp;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case UDF_TASK_CALL: {
|
case UDF_TASK_CALL: {
|
||||||
SUdfCallResponse *callRsp = (SUdfCallResponse *) taosMemoryMalloc(sizeof(SUdfCallResponse));
|
l = deserializeUdfCallResponse(&rsp->callRsp, buf);
|
||||||
callRsp->outputBytes = *(int32_t *) buf;
|
buf += l;
|
||||||
buf += sizeof(int32_t);
|
|
||||||
|
|
||||||
callRsp->output = buf;
|
|
||||||
buf += callRsp->outputBytes;
|
|
||||||
|
|
||||||
callRsp->newStateBytes = *(int32_t *) buf;
|
|
||||||
buf += sizeof(int32_t);
|
|
||||||
|
|
||||||
callRsp->newState = buf;
|
|
||||||
buf += callRsp->newStateBytes;
|
|
||||||
|
|
||||||
rsp->subRsp = callRsp;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case UDF_TASK_TEARDOWN: {
|
case UDF_TASK_TEARDOWN: {
|
||||||
SUdfTeardownResponse *teardownRsp = (SUdfTeardownResponse *) taosMemoryMalloc(sizeof(SUdfTeardownResponse));
|
l = deserializeUdfTeardownResponse(&rsp->teardownRsp, buf);
|
||||||
rsp->subRsp = teardownRsp;
|
buf += l;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
if (buf - bufMsg != bufLen) {
|
int32_t total = buf - bufBegin;
|
||||||
debugPrint("%s", "can not decode response");
|
if (total != rsp->msgLen) {
|
||||||
taosMemoryFree(rsp->subRsp);
|
debugPrint("decode response error");
|
||||||
taosMemoryFree(rsp);
|
return -1;
|
||||||
|
}
|
||||||
|
return buf - bufBegin;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t estimateUdfRequestLen(SUdfRequest *request) {
|
||||||
|
// a larger estimated is generated
|
||||||
|
int32_t size = sizeof(SUdfRequest);
|
||||||
|
if (request->type == UDF_TASK_SETUP) {
|
||||||
|
size += request->setup.pathSize;
|
||||||
|
} else if (request->type == UDF_TASK_CALL) {
|
||||||
|
for (int32_t i = 0; i < request->call.block.numOfCols; ++i) {
|
||||||
|
SUdfColumn* col = request->call.block.udfCols + i;
|
||||||
|
if (col->colData.varLengthColumn) {
|
||||||
|
size += col->colData.varOffsetsLen;
|
||||||
|
size += col->colData.payloadLen;
|
||||||
|
} else {
|
||||||
|
size += col->colData.nullBitmapLen;
|
||||||
|
size += col->colData.dataLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
size += request->call.interBuf.bufLen;
|
||||||
|
}
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t estimateUdfResponseLen(SUdfResponse *response) {
|
||||||
|
int32_t size = sizeof(SUdfResponse);
|
||||||
|
if (response->type == UDF_TASK_CALL) {
|
||||||
|
size += response->callRsp.interBuf.bufLen;
|
||||||
|
SUdfColumnData *resultData = &response->callRsp.resultData;
|
||||||
|
if (!resultData->varLengthColumn) {
|
||||||
|
size += resultData->nullBitmapLen;
|
||||||
|
size += resultData->dataLen;
|
||||||
|
} else {
|
||||||
|
size += resultData->varOffsetsLen;
|
||||||
|
size += resultData->payloadLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
|
||||||
|
debugPrint("%s", "encoding request");
|
||||||
|
|
||||||
|
int len = estimateUdfRequestLen(request);
|
||||||
|
|
||||||
|
char *bufBegin = taosMemoryMalloc(len);
|
||||||
|
char *buf = bufBegin;
|
||||||
|
serializeUdfRequest(request, buf);
|
||||||
|
*pBuf = bufBegin;
|
||||||
|
*pBufLen = request->msgLen;
|
||||||
|
debugPrint("\tLen: estimate: %d, actual:%d", len, *pBufLen);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest *pRequest) {
|
||||||
|
debugPrint("%s", "decoding request");
|
||||||
|
if (*(int32_t *) bufMsg != bufLen) {
|
||||||
|
debugPrint("%s", "decoding request error");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*pResponse = rsp;
|
char *buf = bufMsg;
|
||||||
|
deserializeUdfRequest(pRequest, buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
|
||||||
|
debugPrint("%s", "encoding response");
|
||||||
|
int32_t len = estimateUdfResponseLen(response);
|
||||||
|
|
||||||
|
char *bufBegin = taosMemoryMalloc(len);
|
||||||
|
char *buf = bufBegin;
|
||||||
|
serializeUdfResponse(response, buf);
|
||||||
|
*pBuf = bufBegin;
|
||||||
|
*pBufLen = response->msgLen;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse *rsp) {
|
||||||
|
debugPrint("%s", "decoding response");
|
||||||
|
|
||||||
|
if (*(int32_t *) bufMsg != bufLen) {
|
||||||
|
debugPrint("%s", "can not decode response");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
char *buf = bufMsg;
|
||||||
|
deserializeUdfResponse(rsp, buf);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -519,23 +737,23 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
|
||||||
debugPrint("%s", "get uv task result");
|
debugPrint("%s", "get uv task result");
|
||||||
if (uvTask->type == UV_TASK_REQ_RSP) {
|
if (uvTask->type == UV_TASK_REQ_RSP) {
|
||||||
if (uvTask->rspBuf.base != NULL) {
|
if (uvTask->rspBuf.base != NULL) {
|
||||||
SUdfResponse *rsp;
|
SUdfResponse rsp;
|
||||||
decodeResponse(uvTask->rspBuf.base, uvTask->rspBuf.len, &rsp);
|
decodeResponse(uvTask->rspBuf.base, uvTask->rspBuf.len, &rsp);
|
||||||
task->errCode = rsp->code;
|
task->errCode = rsp.code;
|
||||||
|
|
||||||
switch (task->type) {
|
switch (task->type) {
|
||||||
case UDF_TASK_SETUP: {
|
case UDF_TASK_SETUP: {
|
||||||
//TODO: copy or not
|
//TODO: copy or not
|
||||||
task->_setup.rsp = *(SUdfSetupResponse *) (rsp->subRsp);
|
task->_setup.rsp = rsp.setupRsp;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case UDF_TASK_CALL: {
|
case UDF_TASK_CALL: {
|
||||||
task->_call.rsp = *(SUdfCallResponse *) (rsp->subRsp);
|
task->_call.rsp = rsp.callRsp;
|
||||||
//TODO: copy or not
|
//TODO: copy or not
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case UDF_TASK_TEARDOWN: {
|
case UDF_TASK_TEARDOWN: {
|
||||||
task->_teardown.rsp = *(SUdfTeardownResponse *) (rsp->subRsp);
|
task->_teardown.rsp = rsp.teardownRsp;
|
||||||
//TODO: copy or not?
|
//TODO: copy or not?
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -546,8 +764,6 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
|
||||||
|
|
||||||
// TODO: the call buffer is setup and freed by udf invocation
|
// TODO: the call buffer is setup and freed by udf invocation
|
||||||
taosMemoryFree(uvTask->rspBuf.base);
|
taosMemoryFree(uvTask->rspBuf.base);
|
||||||
taosMemoryFree(rsp->subRsp);
|
|
||||||
taosMemoryFree(rsp);
|
|
||||||
} else {
|
} else {
|
||||||
task->errCode = uvTask->errCode;
|
task->errCode = uvTask->errCode;
|
||||||
}
|
}
|
||||||
|
@ -714,13 +930,13 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
|
||||||
request.seqNum = gUdfTaskSeqNum++;
|
request.seqNum = gUdfTaskSeqNum++;
|
||||||
|
|
||||||
if (task->type == UDF_TASK_SETUP) {
|
if (task->type == UDF_TASK_SETUP) {
|
||||||
request.subReq = &task->_setup.req;
|
request.setup = task->_setup.req;
|
||||||
request.type = UDF_TASK_SETUP;
|
request.type = UDF_TASK_SETUP;
|
||||||
} else if (task->type == UDF_TASK_CALL) {
|
} else if (task->type == UDF_TASK_CALL) {
|
||||||
request.subReq = &task->_call.req;
|
request.call = task->_call.req;
|
||||||
request.type = UDF_TASK_CALL;
|
request.type = UDF_TASK_CALL;
|
||||||
} else if (task->type == UDF_TASK_TEARDOWN) {
|
} else if (task->type == UDF_TASK_TEARDOWN) {
|
||||||
request.subReq = &task->_teardown.req;
|
request.teardown = task->_teardown.req;
|
||||||
request.type = UDF_TASK_TEARDOWN;
|
request.type = UDF_TASK_TEARDOWN;
|
||||||
} else {
|
} else {
|
||||||
//TODO log and return error
|
//TODO log and return error
|
||||||
|
@ -947,8 +1163,7 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
|
||||||
udfcGetUvTaskResponseResult(task, uvTask);
|
udfcGetUvTaskResponseResult(task, uvTask);
|
||||||
if (uvTaskType == UV_TASK_CONNECT) {
|
if (uvTaskType == UV_TASK_CONNECT) {
|
||||||
task->session->udfSvcPipe = uvTask->pipe;
|
task->session->udfSvcPipe = uvTask->pipe;
|
||||||
}
|
} taosMemoryFree(uvTask);
|
||||||
taosMemoryFree(uvTask);
|
|
||||||
uvTask = NULL;
|
uvTask = NULL;
|
||||||
return task->errCode;
|
return task->errCode;
|
||||||
}
|
}
|
||||||
|
@ -983,8 +1198,8 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) {
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newState,
|
int32_t callUdf(UdfHandle handle, int8_t callType, SUdfDataBlock *input, SUdfInterBuf *state,
|
||||||
int32_t *newStateSize, SUdfDataBlock *output) {
|
SUdfColumnData* output, SUdfInterBuf *newState, bool initFirst) {
|
||||||
debugPrint("%s", "client call udf");
|
debugPrint("%s", "client call udf");
|
||||||
|
|
||||||
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
|
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
|
||||||
|
@ -993,24 +1208,47 @@ int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, S
|
||||||
task->type = UDF_TASK_CALL;
|
task->type = UDF_TASK_CALL;
|
||||||
|
|
||||||
SUdfCallRequest *req = &task->_call.req;
|
SUdfCallRequest *req = &task->_call.req;
|
||||||
|
switch (callType) {
|
||||||
|
case TSDB_UDF_CALL_AGG_PROC: {
|
||||||
|
req->block = *input;
|
||||||
|
req->interBuf = *state;
|
||||||
|
req->initFirst = initFirst;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDb_UDF_CALL_AGG_FIN: {
|
||||||
|
req->interBuf = *state;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||||
|
req->block = *input;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
req->state = state;
|
|
||||||
req->stateBytes = stateSize;
|
|
||||||
req->inputBytes = input.size;
|
|
||||||
req->input = input.data;
|
|
||||||
req->udfHandle = task->session->severHandle;
|
|
||||||
req->step = step;
|
|
||||||
|
|
||||||
udfcRunUvTask(task, UV_TASK_REQ_RSP);
|
udfcRunUvTask(task, UV_TASK_REQ_RSP);
|
||||||
|
|
||||||
SUdfCallResponse *rsp = &task->_call.rsp;
|
SUdfCallResponse *rsp = &task->_call.rsp;
|
||||||
*newState = rsp->newState;
|
switch (callType) {
|
||||||
*newStateSize = rsp->newStateBytes;
|
case TSDB_UDF_CALL_AGG_PROC: {
|
||||||
output->size = rsp->outputBytes;
|
*newState = rsp->interBuf;
|
||||||
output->data = rsp->output;
|
break;
|
||||||
int32_t err = task->errCode;
|
}
|
||||||
|
|
||||||
|
case TSDb_UDF_CALL_AGG_FIN: {
|
||||||
|
*output = rsp->resultData;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||||
|
*output = rsp->resultData;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
return err;
|
return task->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t teardownUdf(UdfHandle handle) {
|
int32_t teardownUdf(UdfHandle handle) {
|
||||||
|
|
|
@ -44,7 +44,7 @@ typedef struct SUdf {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
|
||||||
uv_lib_t lib;
|
uv_lib_t lib;
|
||||||
TUdfFunc normalFunc;
|
TUdfScalarProcFunc normalFunc;
|
||||||
} SUdf;
|
} SUdf;
|
||||||
|
|
||||||
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
||||||
|
|
Loading…
Reference in New Issue