SSDataBlock and SUdfInterBuf message passing between taosd/udfd
This commit is contained in:
parent
3526c0d455
commit
4817f54ae9
|
@ -20,6 +20,7 @@
|
|||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include "tmsg.h"
|
||||
#include "tcommon.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -97,18 +98,20 @@ typedef struct SUdfInterBuf {
|
|||
char* buf;
|
||||
} SUdfInterBuf;
|
||||
|
||||
//TODO: translate these calls to callUdf
|
||||
int32_t callUdfAggInit(SUdfInterBuf *interBuf);
|
||||
// input: block, initFirst
|
||||
// output: interbuf
|
||||
int32_t callUdfAggProcess(SUdfDataBlock *block, SUdfInterBuf *interBuf, bool initFirst);
|
||||
int32_t callUdfAggProcess(SSDataBlock *block, SUdfInterBuf *interBuf);
|
||||
// input: interBuf
|
||||
// output: resultData
|
||||
int32_t callUdfAggFinalize(SUdfInterBuf *interBuf, SUdfColumnData *resultData);
|
||||
int32_t callUdfAggFinalize(SUdfInterBuf *interBuf, SSDataBlock *resultData);
|
||||
// input: interbuf1, interbuf2
|
||||
// output: resultBuf
|
||||
int32_t callUdfAggMerge(SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
|
||||
// input: block
|
||||
// output: resultData
|
||||
int32_t callUdfScalaProcess(SUdfDataBlock *block, SUdfColumnData *resultData);
|
||||
int32_t callUdfScalaProcess(SSDataBlock *block, SSDataBlock *resultData);
|
||||
|
||||
/**
|
||||
* tearn down udf
|
||||
|
@ -125,6 +128,7 @@ int32_t teardownUdf(UdfHandle handle);
|
|||
typedef int32_t (*TUdfSetupFunc)();
|
||||
typedef int32_t (*TUdfTeardownFunc)();
|
||||
|
||||
//TODO: add API to check function arguments type, number etc.
|
||||
//TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory.
|
||||
// then UDF framework will free the memory
|
||||
//typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data);
|
||||
|
|
|
@ -30,9 +30,10 @@ enum {
|
|||
};
|
||||
|
||||
enum {
|
||||
TSDB_UDF_CALL_AGG_PROC = 0,
|
||||
TSDB_UDF_CALL_AGG_INIT = 0,
|
||||
TSDB_UDF_CALL_AGG_PROC,
|
||||
TSDB_UDF_CALL_AGG_MERGE,
|
||||
TSDb_UDF_CALL_AGG_FIN,
|
||||
TSDB_UDF_CALL_AGG_FIN,
|
||||
TSDB_UDF_CALL_SCALA_PROC,
|
||||
};
|
||||
|
||||
|
@ -49,14 +50,15 @@ typedef struct SUdfCallRequest {
|
|||
int64_t udfHandle;
|
||||
int8_t callType;
|
||||
|
||||
SUdfDataBlock block;
|
||||
SSDataBlock block;
|
||||
SUdfInterBuf interBuf;
|
||||
SUdfInterBuf interBuf2;
|
||||
bool initFirst;
|
||||
int8_t initFirst;
|
||||
} SUdfCallRequest;
|
||||
|
||||
typedef struct SUdfCallResponse {
|
||||
SUdfColumnData resultData;
|
||||
int8_t callType;
|
||||
SSDataBlock resultData;
|
||||
SUdfInterBuf interBuf;
|
||||
} SUdfCallResponse;
|
||||
|
||||
|
@ -94,10 +96,11 @@ typedef struct SUdfResponse {
|
|||
};
|
||||
} SUdfResponse;
|
||||
|
||||
int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest *pRequest);
|
||||
int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
|
||||
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse *pResponse);
|
||||
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response);
|
||||
int32_t encodeUdfRequest(void **buf, const SUdfRequest* request);
|
||||
void* decodeUdfRequest(const void *buf, SUdfRequest* request);
|
||||
|
||||
int32_t encodeUdfResponse(void **buf, const SUdfResponse *response);
|
||||
void* decodeUdfResponse(const void* buf, SUdfResponse *response);
|
||||
|
||||
void freeUdfColumnData(SUdfColumnData *data);
|
||||
void freeUdfColumn(SUdfColumn* col);
|
||||
|
|
|
@ -213,507 +213,241 @@ QUEUE gUdfTaskQueue = {0};
|
|||
|
||||
QUEUE gUvProcTaskQueue = {0};
|
||||
|
||||
int32_t serializeUdfColumnData(SUdfColumnData* data, char* pBuf) {
|
||||
char* bufBeg = pBuf;
|
||||
*(int32_t*)pBuf = data->numOfRows;
|
||||
pBuf += sizeof(int32_t);
|
||||
*(bool*)pBuf = data->varLengthColumn;
|
||||
pBuf += sizeof(bool);
|
||||
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
|
||||
int32_t len = 0;
|
||||
len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
|
||||
len += taosEncodeSEpSet(buf, &setup->epSet);
|
||||
return len;
|
||||
}
|
||||
|
||||
if (!data->varLengthColumn) {
|
||||
*(int32_t*)pBuf = data->nullBitmapLen;
|
||||
pBuf += sizeof(int32_t);
|
||||
memcpy(pBuf, data->nullBitmap, data->nullBitmapLen);
|
||||
pBuf += data->nullBitmapLen;
|
||||
*(int32_t*)pBuf = data->dataLen;
|
||||
pBuf += sizeof(int32_t);
|
||||
memcpy(pBuf, data->data, data->dataLen);
|
||||
pBuf += data->dataLen;
|
||||
void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
|
||||
buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
|
||||
buf = taosDecodeSEpSet((void*)buf, &request->epSet);
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
|
||||
int32_t len = 0;
|
||||
len += taosEncodeFixedI32(buf, state->bufLen);
|
||||
len += taosEncodeBinary(buf, state->buf, state->bufLen);
|
||||
return len;
|
||||
}
|
||||
|
||||
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
|
||||
buf = taosDecodeFixedI32(buf, &state->bufLen);
|
||||
buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
|
||||
int32_t len = 0;
|
||||
len += taosEncodeFixedI64(buf, call->udfHandle);
|
||||
len += taosEncodeFixedI8(buf, call->callType);
|
||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
||||
len += tEncodeDataBlock(buf, &call->block);
|
||||
} else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
|
||||
len += taosEncodeFixedI8(buf, call->initFirst);
|
||||
} else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
|
||||
len += tEncodeDataBlock(buf, &call->block);
|
||||
len += encodeUdfInterBuf(buf, &call->interBuf);
|
||||
} else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
|
||||
len += encodeUdfInterBuf(buf, &call->interBuf);
|
||||
len += encodeUdfInterBuf(buf, &call->interBuf2);
|
||||
} else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
|
||||
len += encodeUdfInterBuf(buf, &call->interBuf);
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) {
|
||||
buf = taosDecodeFixedI64(buf, &call->udfHandle);
|
||||
buf = taosDecodeFixedI8(buf, &call->callType);
|
||||
switch (call->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC:
|
||||
buf = tDecodeDataBlock(buf, &call->block);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_INIT:
|
||||
buf = taosDecodeFixedI8(buf, &call->initFirst);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_PROC:
|
||||
buf = tDecodeDataBlock(buf, &call->block);
|
||||
buf = decodeUdfInterBuf(buf, &call->interBuf);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_MERGE:
|
||||
buf = decodeUdfInterBuf(buf, &call->interBuf);
|
||||
buf = decodeUdfInterBuf(buf, &call->interBuf2);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_FIN:
|
||||
buf = decodeUdfInterBuf(buf, &call->interBuf);
|
||||
break;
|
||||
}
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
|
||||
int32_t len = 0;
|
||||
len += taosEncodeFixedI64(buf, teardown->udfHandle);
|
||||
return len;
|
||||
}
|
||||
|
||||
void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown) {
|
||||
buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) {
|
||||
int32_t len = 0;
|
||||
if (buf == NULL) {
|
||||
len += sizeof(request->msgLen);
|
||||
} else {
|
||||
*(int32_t*)pBuf = data->varOffsetsLen;
|
||||
pBuf += sizeof(int32_t);
|
||||
memcpy(pBuf, data->varOffsets, data->varOffsetsLen);
|
||||
pBuf += data->varOffsetsLen;
|
||||
*(int32_t*)pBuf = data->payloadLen;
|
||||
pBuf += sizeof(int32_t);
|
||||
memcpy(pBuf, data->payload, data->payloadLen);
|
||||
pBuf += data->payloadLen;
|
||||
*(int32_t*)(*buf) = request->msgLen;
|
||||
*buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
|
||||
}
|
||||
int32_t len = pBuf - bufBeg;
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfColumnData(SUdfColumnData* pData, char* buf) {
|
||||
char* bufBeg = buf;
|
||||
pData->numOfRows = *(int32_t*)buf;
|
||||
buf += sizeof(int32_t);
|
||||
pData->varLengthColumn = *(bool*)buf;
|
||||
buf += sizeof(bool);
|
||||
|
||||
if (!pData->varLengthColumn) {
|
||||
pData->nullBitmapLen = *(int32_t*)buf;
|
||||
buf += sizeof(int32_t);
|
||||
|
||||
//TODO: optimize for less memory copy
|
||||
pData->nullBitmap = taosMemoryMalloc(pData->nullBitmapLen);
|
||||
memcpy(pData->nullBitmap, buf, pData->nullBitmapLen);
|
||||
buf += pData->nullBitmapLen;
|
||||
|
||||
pData->dataLen = *(int32_t*)buf;
|
||||
buf += sizeof(int32_t);
|
||||
|
||||
pData->data = taosMemoryMalloc(pData->dataLen);
|
||||
memcpy(pData->data, buf, pData->dataLen);
|
||||
buf += pData->dataLen;
|
||||
} else {
|
||||
pData->varOffsetsLen = *(int32_t*)buf;
|
||||
buf += sizeof(int32_t);
|
||||
|
||||
pData->varOffsets = taosMemoryMalloc(pData->varOffsetsLen);
|
||||
memcpy(pData->varOffsets, buf, pData->varOffsetsLen);
|
||||
buf += pData->varOffsetsLen;
|
||||
|
||||
pData->payloadLen = *(int32_t*)buf;
|
||||
buf += sizeof(int32_t);
|
||||
|
||||
pData->payload = taosMemoryMalloc(pData->payloadLen);
|
||||
memcpy(pData->payload, buf, pData->payloadLen);
|
||||
buf += pData->payloadLen;
|
||||
}
|
||||
int32_t len = buf - bufBeg;
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t serializeUdfColumnMeta(SUdfColumnMeta *meta, char* pBuf) {
|
||||
char* bufBeg = pBuf;
|
||||
memcpy(pBuf, meta, sizeof(SUdfColumnMeta));
|
||||
pBuf += sizeof(SUdfColumnMeta);
|
||||
|
||||
int32_t len = pBuf - bufBeg;
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfColumnMeta(SUdfColumnMeta *pMeta, char* buf) {
|
||||
char *bufBegin = buf;
|
||||
memcpy(pMeta, buf, sizeof(SUdfColumnMeta));
|
||||
buf += sizeof(SUdfColumnMeta);
|
||||
|
||||
int32_t len = buf - bufBegin;
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t serializeUdfColumn(SUdfColumn *udfCol, char *pBuf) {
|
||||
char *bufBegin = pBuf;
|
||||
|
||||
int32_t len = serializeUdfColumnMeta(&udfCol->colMeta, pBuf);
|
||||
pBuf += len;
|
||||
|
||||
len = serializeUdfColumnData(&udfCol->colData, pBuf);
|
||||
pBuf += len;
|
||||
|
||||
int32_t totalLen = pBuf - bufBegin;
|
||||
return totalLen;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfColumn(SUdfColumn *pUdfCol, char *buf) {
|
||||
char *bufBegin = buf;
|
||||
|
||||
int32_t len = deserializeUdfColumnMeta(&pUdfCol->colMeta, buf);
|
||||
buf += len;
|
||||
|
||||
len = deserializeUdfColumnData(&pUdfCol->colData, buf);
|
||||
buf += len;
|
||||
|
||||
int32_t totalLen = buf - bufBegin;
|
||||
return totalLen;
|
||||
}
|
||||
|
||||
int32_t serializeUdfDataBlock(SUdfDataBlock *block, char *pBuf) {
|
||||
char *bufBegin = pBuf;
|
||||
|
||||
*(int32_t*)pBuf = block->numOfRows;
|
||||
pBuf += sizeof(int32_t);
|
||||
|
||||
*(int32_t*)pBuf = block->numOfCols;
|
||||
pBuf += sizeof(int32_t);
|
||||
|
||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||
SUdfColumn* col = block->udfCols[i];
|
||||
int32_t l = serializeUdfColumn(col, pBuf);
|
||||
pBuf += l;
|
||||
}
|
||||
|
||||
int32_t totalLen = pBuf - bufBegin;
|
||||
return totalLen;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) {
|
||||
char *bufBegin = buf;
|
||||
|
||||
pBlock->numOfRows = *(int32_t*)buf;
|
||||
buf += sizeof(int32_t);
|
||||
|
||||
pBlock->numOfCols = *(int32_t*)buf;
|
||||
buf += sizeof(int32_t);
|
||||
|
||||
pBlock->udfCols = taosMemoryMalloc(sizeof(SUdfColumn*) * pBlock->numOfCols);
|
||||
for (int32_t i = 0; i < pBlock->numOfCols; ++i) {
|
||||
pBlock->udfCols[i] = taosMemoryMalloc(sizeof(SUdfColumn));
|
||||
int32_t l = deserializeUdfColumn(pBlock->udfCols[i], buf);
|
||||
buf += l;
|
||||
}
|
||||
|
||||
int32_t totalLen = buf - bufBegin;
|
||||
return totalLen;
|
||||
}
|
||||
|
||||
int32_t serializeUdfInterBuf(SUdfInterBuf *state, char *pBuf) {
|
||||
char *bufBegin = pBuf;
|
||||
|
||||
*(int32_t*)pBuf = state->bufLen;
|
||||
pBuf += sizeof(int32_t);
|
||||
memcpy(pBuf, state->buf, state->bufLen);
|
||||
pBuf += state->bufLen;
|
||||
|
||||
return pBuf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfInterBuf(SUdfInterBuf *pState, char *buf) {
|
||||
char* bufBegin = buf;
|
||||
pState->bufLen = *(int32_t*)buf;
|
||||
buf += sizeof(int32_t);
|
||||
|
||||
pState->buf = taosMemoryMalloc(pState->bufLen);
|
||||
memcpy(pState->buf, buf, pState->bufLen);
|
||||
buf += pState->bufLen;
|
||||
return buf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t serializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) {
|
||||
char *bufBegin = buf;
|
||||
|
||||
memcpy(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
|
||||
buf += TSDB_FUNC_NAME_LEN;
|
||||
|
||||
memcpy(buf, &setup->epSet, sizeof(SEpSet));
|
||||
buf += sizeof(SEpSet);
|
||||
|
||||
return buf - bufBegin;
|
||||
};
|
||||
|
||||
int32_t deserializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) {
|
||||
char* bufBegin = buf;
|
||||
|
||||
memcpy(setup->udfName, buf, TSDB_FUNC_NAME_LEN);
|
||||
buf += TSDB_FUNC_NAME_LEN;
|
||||
|
||||
memcpy(&setup->epSet, buf, sizeof(SEpSet));
|
||||
buf += sizeof(SEpSet);
|
||||
return buf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t serializeUdfTeardownRequest(SUdfTeardownRequest *teardown, char *buf) {
|
||||
char* bufBegin = buf;
|
||||
*(int64_t *) buf = teardown->udfHandle;
|
||||
buf += sizeof(int64_t);
|
||||
|
||||
return buf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfTeardownRequest(SUdfTeardownRequest *teardown, char *buf) {
|
||||
char *bufBegin = buf;
|
||||
teardown->udfHandle = *(int64_t *) buf;
|
||||
buf += sizeof(int64_t);
|
||||
return buf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t serializeUdfSetupResponse(SUdfSetupResponse *setupRsp, char *buf) {
|
||||
char *bufBegin = buf;
|
||||
|
||||
*(int64_t *) buf = setupRsp->udfHandle;
|
||||
buf += sizeof(int64_t);
|
||||
|
||||
return buf-bufBegin;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfSetupResponse(SUdfSetupResponse *setupRsp, char *buf) {
|
||||
char *bufBegin = buf;
|
||||
setupRsp->udfHandle = *(int64_t *) buf;
|
||||
buf += sizeof(int64_t);
|
||||
return buf-bufBegin;
|
||||
}
|
||||
|
||||
int32_t serializeUdfTeardownResponse(SUdfTeardownResponse *teardownRsp, char *buf) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfTeardownResponse(SUdfTeardownResponse *teardownRsp, char *buf) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t serializeUdfCallRequest(SUdfCallRequest *call, char *buf) {
|
||||
char* bufBegin = buf;
|
||||
*(int64_t *) buf = call->udfHandle;
|
||||
buf += sizeof(int64_t);
|
||||
*(int8_t *) buf = call->callType;
|
||||
buf += sizeof(int8_t);
|
||||
int32_t l = 0;
|
||||
l = serializeUdfDataBlock(&call->block, buf);
|
||||
buf += l;
|
||||
l = serializeUdfInterBuf(&call->interBuf, buf);
|
||||
buf += l;
|
||||
l = serializeUdfInterBuf(&call->interBuf2, buf);
|
||||
buf += l;
|
||||
|
||||
*(bool*)buf = call->initFirst;
|
||||
buf += sizeof(bool);
|
||||
|
||||
return buf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfCallRequest(SUdfCallRequest *call, char *buf) {
|
||||
char* bufBegin = buf;
|
||||
call->udfHandle = *(int64_t *) buf;
|
||||
buf += sizeof(int64_t);
|
||||
call->callType = *(int8_t *) buf;
|
||||
buf += sizeof(int8_t);
|
||||
int32_t l = 0;
|
||||
l = deserializeUdfDataBlock(&call->block, buf);
|
||||
buf += l;
|
||||
l = deserializeUdfInterBuf(&call->interBuf, buf);
|
||||
buf += l;
|
||||
call->initFirst = *(bool*)buf;
|
||||
buf += sizeof(bool);
|
||||
return buf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t serializeUdfCallResponse(SUdfCallResponse *callRsp, char * buf) {
|
||||
char *bufBegin = buf;
|
||||
int32_t l = 0;
|
||||
l = serializeUdfColumnData(&callRsp->resultData, buf);
|
||||
buf += l;
|
||||
l = serializeUdfInterBuf(&callRsp->interBuf, buf);
|
||||
buf += l;
|
||||
|
||||
return buf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfCallResponse(SUdfCallResponse *callRsp, char * buf) {
|
||||
char *bufBegin = buf;
|
||||
int32_t l = 0;
|
||||
l =deserializeUdfColumnData(&callRsp->resultData, buf);
|
||||
buf += l;
|
||||
l = deserializeUdfInterBuf(&callRsp->interBuf, buf);
|
||||
buf += l;
|
||||
return buf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t serializeUdfRequest(SUdfRequest *request, char *buf) {
|
||||
char* bufBegin = buf;
|
||||
//skip msglen first
|
||||
buf += sizeof(int32_t);
|
||||
|
||||
*(int64_t *) buf = request->seqNum;
|
||||
buf += sizeof(int64_t);
|
||||
*(int8_t *) buf = request->type;
|
||||
buf += sizeof(int8_t);
|
||||
|
||||
int32_t l = 0;
|
||||
if (request->type == UDF_TASK_SETUP) {
|
||||
l = serializeUdfSetupRequest(&request->setup, buf);
|
||||
buf += l;
|
||||
} else if (request->type == UDF_TASK_CALL) {
|
||||
l = serializeUdfCallRequest(&request->call, buf);
|
||||
buf += l;
|
||||
} else if (request->type == UDF_TASK_TEARDOWN){
|
||||
l = serializeUdfTeardownRequest(&request->teardown, buf);
|
||||
buf += l;
|
||||
}
|
||||
*(int32_t*)bufBegin = buf - bufBegin;
|
||||
return buf - bufBegin;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfRequest(SUdfRequest *request, char *buf) {
|
||||
char* bufBegin = buf;
|
||||
request->msgLen = *(int32_t *) (buf);
|
||||
buf += sizeof(int32_t);
|
||||
request->seqNum = *(int64_t *) (buf);
|
||||
buf += sizeof(int64_t);
|
||||
request->type = *(int8_t *) (buf);
|
||||
buf += sizeof(int8_t);
|
||||
|
||||
int32_t l = 0;
|
||||
len += taosEncodeFixedI64(buf, request->seqNum);
|
||||
len += taosEncodeFixedI8(buf, request->type);
|
||||
if (request->type == UDF_TASK_SETUP) {
|
||||
l = deserializeUdfSetupRequest(&request->setup, buf);
|
||||
buf += l;
|
||||
len += encodeUdfSetupRequest(buf, &request->setup);
|
||||
} else if (request->type == UDF_TASK_CALL) {
|
||||
l = deserializeUdfCallRequest(&request->call, buf);
|
||||
buf += l;
|
||||
} else if (request->type == UDF_TASK_TEARDOWN){
|
||||
l = deserializeUdfTeardownRequest(&request->teardown, buf);
|
||||
buf += l;
|
||||
len += encodeUdfCallRequest(buf, &request->call);
|
||||
} else if (request->type == UDF_TASK_TEARDOWN) {
|
||||
len += encodeUdfTeardownRequest(buf, &request->teardown);
|
||||
}
|
||||
int32_t totalLen = buf-bufBegin;
|
||||
if (totalLen != request->msgLen) {
|
||||
debugPrint("decoding request error");
|
||||
return -1;
|
||||
}
|
||||
return buf - bufBegin;
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t serializeUdfResponse(SUdfResponse *response, char *buf) {
|
||||
char* bufBegin = buf;
|
||||
//skip msgLen
|
||||
buf += sizeof(int32_t);
|
||||
void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
|
||||
request->msgLen = *(int32_t*)(buf);
|
||||
POINTER_SHIFT(buf, sizeof(request->msgLen));
|
||||
|
||||
*(int64_t *) buf = response->seqNum;
|
||||
buf += sizeof(int64_t);
|
||||
*(int8_t *) buf = response->type;
|
||||
buf += sizeof(int8_t);
|
||||
*(int32_t *) buf = response->code;
|
||||
buf += sizeof(int32_t);
|
||||
int32_t l = 0;
|
||||
switch (response->type) {
|
||||
case UDF_TASK_SETUP: {
|
||||
l = serializeUdfSetupResponse(&response->setupRsp, buf);
|
||||
buf += l;
|
||||
break;
|
||||
}
|
||||
case UDF_TASK_CALL: {
|
||||
l = serializeUdfCallResponse(&response->callRsp, buf);
|
||||
buf += l;
|
||||
break;
|
||||
}
|
||||
case UDF_TASK_TEARDOWN: {
|
||||
l = serializeUdfTeardownResponse(&response->teardownRsp, buf);
|
||||
buf += l;
|
||||
}
|
||||
buf = taosDecodeFixedI64(buf, &request->seqNum);
|
||||
buf = taosDecodeFixedI8(buf, &request->type);
|
||||
|
||||
if (request->type == UDF_TASK_SETUP) {
|
||||
buf = decodeUdfSetupRequest(buf, &request->setup);
|
||||
} else if (request->type == UDF_TASK_CALL) {
|
||||
buf = decodeUdfCallRequest(buf, &request->call);
|
||||
} else if (request->type == UDF_TASK_TEARDOWN) {
|
||||
buf = decodeUdfTeardownRequest(buf, &request->teardown);
|
||||
}
|
||||
|
||||
*(int32_t*)bufBegin = buf - bufBegin;
|
||||
return buf - bufBegin;
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
int32_t deserializeUdfResponse(SUdfResponse *rsp, char *buf) {
|
||||
char* bufBegin = buf;
|
||||
rsp->msgLen = *(int32_t *) buf;
|
||||
buf += sizeof(int32_t);
|
||||
rsp->seqNum = *(int64_t *) buf;
|
||||
buf += sizeof(int64_t);
|
||||
rsp->type = *(int8_t *) buf;
|
||||
buf += sizeof(int8_t);
|
||||
rsp->code = *(int32_t *) buf;
|
||||
buf += sizeof(int32_t);
|
||||
int32_t l = 0;
|
||||
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
|
||||
int32_t len = 0;
|
||||
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
|
||||
return len;
|
||||
}
|
||||
|
||||
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
|
||||
buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
|
||||
int32_t len = 0;
|
||||
len += taosEncodeFixedI8(buf, callRsp->callType);
|
||||
switch (callRsp->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC:
|
||||
len += tEncodeDataBlock(buf, &callRsp->resultData);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_INIT:
|
||||
len += encodeUdfInterBuf(buf, &callRsp->interBuf);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_PROC:
|
||||
len += encodeUdfInterBuf(buf, &callRsp->interBuf);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_MERGE:
|
||||
len += encodeUdfInterBuf(buf, &callRsp->interBuf);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_FIN:
|
||||
len += tEncodeDataBlock(buf, &callRsp->resultData);
|
||||
break;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) {
|
||||
buf = taosDecodeFixedI8(buf, &callRsp->callType);
|
||||
switch (callRsp->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC:
|
||||
buf = tDecodeDataBlock(buf, &callRsp->resultData);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_INIT:
|
||||
buf = decodeUdfInterBuf(buf, &callRsp->interBuf);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_PROC:
|
||||
buf = decodeUdfInterBuf(buf, &callRsp->interBuf);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_MERGE:
|
||||
buf = decodeUdfInterBuf(buf, &callRsp->interBuf);
|
||||
break;
|
||||
case TSDB_UDF_CALL_AGG_FIN:
|
||||
buf = tDecodeDataBlock(buf, &callRsp->resultData);
|
||||
break;
|
||||
}
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse) {
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
|
||||
int32_t len = 0;
|
||||
if (buf == NULL) {
|
||||
len += sizeof(rsp->msgLen);
|
||||
} else {
|
||||
*(int32_t*)(*buf) = rsp->msgLen;
|
||||
*buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
|
||||
}
|
||||
|
||||
len += taosEncodeFixedI64(buf, rsp->seqNum);
|
||||
len += taosEncodeFixedI8(buf, rsp->type);
|
||||
len += taosEncodeFixedI32(buf, rsp->code);
|
||||
|
||||
switch (rsp->type) {
|
||||
case UDF_TASK_SETUP: {
|
||||
l = deserializeUdfSetupResponse(&rsp->setupRsp, buf);
|
||||
buf += l;
|
||||
case UDF_TASK_SETUP:
|
||||
len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
|
||||
break;
|
||||
}
|
||||
case UDF_TASK_CALL: {
|
||||
l = deserializeUdfCallResponse(&rsp->callRsp, buf);
|
||||
buf += l;
|
||||
case UDF_TASK_CALL:
|
||||
len += encodeUdfCallResponse(buf, &rsp->callRsp);
|
||||
break;
|
||||
case UDF_TASK_TEARDOWN:
|
||||
len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
|
||||
break;
|
||||
default:
|
||||
//TODO: log error
|
||||
break;
|
||||
}
|
||||
case UDF_TASK_TEARDOWN: {
|
||||
l = deserializeUdfTeardownResponse(&rsp->teardownRsp, buf);
|
||||
buf += l;
|
||||
}
|
||||
}
|
||||
int32_t total = buf - bufBegin;
|
||||
if (total != rsp->msgLen) {
|
||||
debugPrint("decode response error");
|
||||
return -1;
|
||||
}
|
||||
return buf - bufBegin;
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t estimateUdfRequestLen(SUdfRequest *request) {
|
||||
// a larger estimated is generated
|
||||
int32_t size = sizeof(SUdfRequest);
|
||||
if (request->type == UDF_TASK_CALL) {
|
||||
size += request->call.block.numOfCols * sizeof(SUdfColumn*);
|
||||
for (int32_t i = 0; i < request->call.block.numOfCols; ++i) {
|
||||
size += sizeof(SUdfColumn);
|
||||
SUdfColumn* col = request->call.block.udfCols[i];
|
||||
if (col->colData.varLengthColumn) {
|
||||
size += col->colData.varOffsetsLen;
|
||||
size += col->colData.payloadLen;
|
||||
} else {
|
||||
size += col->colData.nullBitmapLen;
|
||||
size += col->colData.dataLen;
|
||||
}
|
||||
}
|
||||
size += request->call.interBuf.bufLen;
|
||||
void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
|
||||
rsp->msgLen = *(int32_t*)(buf);
|
||||
POINTER_SHIFT(buf, sizeof(rsp->msgLen));
|
||||
buf = taosDecodeFixedI64(buf, &rsp->seqNum);
|
||||
buf = taosDecodeFixedI8(buf, &rsp->type);
|
||||
buf = taosDecodeFixedI32(buf, &rsp->code);
|
||||
|
||||
switch (rsp->type) {
|
||||
case UDF_TASK_SETUP:
|
||||
buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
|
||||
break;
|
||||
case UDF_TASK_CALL:
|
||||
buf = decodeUdfCallResponse(buf, &rsp->callRsp);
|
||||
break;
|
||||
case UDF_TASK_TEARDOWN:
|
||||
buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
|
||||
break;
|
||||
default:
|
||||
//TODO: log error
|
||||
break;
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
int32_t estimateUdfResponseLen(SUdfResponse *response) {
|
||||
int32_t size = sizeof(SUdfResponse);
|
||||
if (response->type == UDF_TASK_CALL) {
|
||||
size += response->callRsp.interBuf.bufLen;
|
||||
SUdfColumnData *resultData = &response->callRsp.resultData;
|
||||
if (!resultData->varLengthColumn) {
|
||||
size += resultData->nullBitmapLen;
|
||||
size += resultData->dataLen;
|
||||
} else {
|
||||
size += resultData->varOffsetsLen;
|
||||
size += resultData->payloadLen;
|
||||
}
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
|
||||
debugPrint("%s", "encoding request");
|
||||
|
||||
int len = estimateUdfRequestLen(request);
|
||||
|
||||
char *bufBegin = taosMemoryMalloc(len);
|
||||
char *buf = bufBegin;
|
||||
serializeUdfRequest(request, buf);
|
||||
*pBuf = bufBegin;
|
||||
*pBufLen = request->msgLen;
|
||||
debugPrint("\tLen: estimate: %d, actual:%d", len, *pBufLen);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest *pRequest) {
|
||||
debugPrint("%s", "decoding request");
|
||||
if (*(int32_t *) bufMsg != bufLen) {
|
||||
debugPrint("%s", "decoding request error");
|
||||
return -1;
|
||||
}
|
||||
char *buf = bufMsg;
|
||||
deserializeUdfRequest(pRequest, buf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
|
||||
debugPrint("%s", "encoding response");
|
||||
int32_t len = estimateUdfResponseLen(response);
|
||||
|
||||
char *bufBegin = taosMemoryMalloc(len);
|
||||
char *buf = bufBegin;
|
||||
serializeUdfResponse(response, buf);
|
||||
*pBuf = bufBegin;
|
||||
*pBufLen = response->msgLen;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse *rsp) {
|
||||
debugPrint("%s", "decoding response");
|
||||
|
||||
if (*(int32_t *) bufMsg != bufLen) {
|
||||
debugPrint("%s", "can not decode response");
|
||||
return -1;
|
||||
}
|
||||
char *buf = bufMsg;
|
||||
deserializeUdfResponse(rsp, buf);
|
||||
return 0;
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
void freeUdfColumnData(SUdfColumnData *data) {
|
||||
|
@ -770,7 +504,8 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
|
|||
if (uvTask->type == UV_TASK_REQ_RSP) {
|
||||
if (uvTask->rspBuf.base != NULL) {
|
||||
SUdfResponse rsp;
|
||||
decodeResponse(uvTask->rspBuf.base, uvTask->rspBuf.len, &rsp);
|
||||
void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
|
||||
assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
|
||||
task->errCode = rsp.code;
|
||||
|
||||
switch (task->type) {
|
||||
|
@ -973,9 +708,10 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
|
|||
} else {
|
||||
//TODO log and return error
|
||||
}
|
||||
char *buf = NULL;
|
||||
int32_t bufLen = 0;
|
||||
encodeRequest(&buf, &bufLen, &request);
|
||||
int32_t bufLen = encodeUdfRequest(NULL, &request);
|
||||
request.msgLen = bufLen;
|
||||
void *buf = taosMemoryMalloc(bufLen);
|
||||
encodeUdfRequest(&buf, &request);
|
||||
uvTask->reqBuf = uv_buf_init(buf, bufLen);
|
||||
uvTask->seqNum = request.seqNum;
|
||||
} else if (uvTaskType == UV_TASK_DISCONNECT) {
|
||||
|
@ -1226,8 +962,8 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) {
|
|||
return err;
|
||||
}
|
||||
|
||||
int32_t callUdf(UdfHandle handle, int8_t callType, SUdfDataBlock *input, SUdfInterBuf *state,
|
||||
SUdfColumnData* output, SUdfInterBuf *newState, bool initFirst) {
|
||||
int32_t callUdf(UdfHandle handle, int8_t callType, int8_t initFirst, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
|
||||
SSDataBlock* output, SUdfInterBuf *newState) {
|
||||
debugPrint("%s", "client call udf");
|
||||
|
||||
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
|
||||
|
@ -1237,14 +973,21 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SUdfDataBlock *input, SUdfInt
|
|||
|
||||
SUdfCallRequest *req = &task->_call.req;
|
||||
switch (callType) {
|
||||
case TSDB_UDF_CALL_AGG_INIT: {
|
||||
req->initFirst = 1;
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_PROC: {
|
||||
req->block = *input;
|
||||
req->interBuf = *state;
|
||||
req->initFirst = initFirst;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDb_UDF_CALL_AGG_FIN: {
|
||||
case TSDB_UDF_CALL_AGG_MERGE: {
|
||||
req->interBuf = *state;
|
||||
req->interBuf2 = *state2;
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_FIN: {
|
||||
req->interBuf = *state;
|
||||
break;
|
||||
}
|
||||
|
@ -1258,12 +1001,19 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SUdfDataBlock *input, SUdfInt
|
|||
|
||||
SUdfCallResponse *rsp = &task->_call.rsp;
|
||||
switch (callType) {
|
||||
case TSDB_UDF_CALL_AGG_INIT: {
|
||||
*newState = rsp->interBuf;
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_PROC: {
|
||||
*newState = rsp->interBuf;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDb_UDF_CALL_AGG_FIN: {
|
||||
case TSDB_UDF_CALL_AGG_MERGE: {
|
||||
*newState = rsp->interBuf;
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_FIN: {
|
||||
*output = rsp->resultData;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ typedef struct SUdfHandle {
|
|||
void udfdProcessRequest(uv_work_t *req) {
|
||||
SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data);
|
||||
SUdfRequest request = {0};
|
||||
decodeRequest(uvUdf->input.base, uvUdf->input.len, &request);
|
||||
decodeUdfRequest(uvUdf->input.base, &request);
|
||||
|
||||
switch (request.type) {
|
||||
case UDF_TASK_SETUP: {
|
||||
|
@ -94,9 +94,10 @@ void udfdProcessRequest(uv_work_t *req) {
|
|||
rsp.type = request.type;
|
||||
rsp.code = 0;
|
||||
rsp.setupRsp.udfHandle = (int64_t) (handle);
|
||||
char *buf;
|
||||
int32_t len;
|
||||
encodeResponse(&buf, &len, &rsp);
|
||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||
rsp.msgLen = len;
|
||||
void *buf = taosMemoryMalloc(len);
|
||||
encodeUdfResponse(&buf, &rsp);
|
||||
|
||||
uvUdf->output = uv_buf_init(buf, len);
|
||||
|
||||
|
@ -110,7 +111,8 @@ void udfdProcessRequest(uv_work_t *req) {
|
|||
SUdfHandle *handle = (SUdfHandle *) (call->udfHandle);
|
||||
SUdf *udf = handle->udf;
|
||||
|
||||
SUdfDataBlock input = call->block;
|
||||
SUdfDataBlock input = {0};
|
||||
//TODO: convertSDataBlockToUdfDataBlock(call->block, &input);
|
||||
SUdfColumnData output;
|
||||
//TODO: call different functions according to call type, for now just calar
|
||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
||||
|
@ -124,12 +126,13 @@ void udfdProcessRequest(uv_work_t *req) {
|
|||
rsp->type = request.type;
|
||||
rsp->code = 0;
|
||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||
subRsp->resultData = output;
|
||||
//TODO: convertSUdfColumnDataToSSDataBlock(output, &subRsp->resultData);
|
||||
}
|
||||
|
||||
char *buf;
|
||||
int32_t len;
|
||||
encodeResponse(&buf, &len, rsp);
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *buf = taosMemoryMalloc(len);
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(buf, len);
|
||||
|
||||
//TODO: free
|
||||
|
@ -158,9 +161,10 @@ void udfdProcessRequest(uv_work_t *req) {
|
|||
rsp->type = request.type;
|
||||
rsp->code = 0;
|
||||
SUdfTeardownResponse *subRsp = &response.teardownRsp;
|
||||
char *buf;
|
||||
int32_t len;
|
||||
encodeResponse(&buf, &len, rsp);
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
void *buf = taosMemoryMalloc(len);
|
||||
rsp->msgLen = len;
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(buf, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
|
|
Loading…
Reference in New Issue