diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index 45e4b2a0f5..683274f3ae 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -20,6 +20,7 @@ #include #include #include "tmsg.h" +#include "tcommon.h" #ifdef __cplusplus extern "C" { @@ -97,18 +98,20 @@ typedef struct SUdfInterBuf { char* buf; } SUdfInterBuf; +//TODO: translate these calls to callUdf +int32_t callUdfAggInit(SUdfInterBuf *interBuf); // input: block, initFirst // output: interbuf -int32_t callUdfAggProcess(SUdfDataBlock *block, SUdfInterBuf *interBuf, bool initFirst); +int32_t callUdfAggProcess(SSDataBlock *block, SUdfInterBuf *interBuf); // input: interBuf // output: resultData -int32_t callUdfAggFinalize(SUdfInterBuf *interBuf, SUdfColumnData *resultData); +int32_t callUdfAggFinalize(SUdfInterBuf *interBuf, SSDataBlock *resultData); // input: interbuf1, interbuf2 // output: resultBuf int32_t callUdfAggMerge(SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); // input: block // output: resultData -int32_t callUdfScalaProcess(SUdfDataBlock *block, SUdfColumnData *resultData); +int32_t callUdfScalaProcess(SSDataBlock *block, SSDataBlock *resultData); /** * tearn down udf @@ -125,6 +128,7 @@ int32_t teardownUdf(UdfHandle handle); typedef int32_t (*TUdfSetupFunc)(); typedef int32_t (*TUdfTeardownFunc)(); +//TODO: add API to check function arguments type, number etc. //TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory. // then UDF framework will free the memory //typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data); diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 7890eef8be..6c3da8cd89 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -30,9 +30,10 @@ enum { }; enum { - TSDB_UDF_CALL_AGG_PROC = 0, + TSDB_UDF_CALL_AGG_INIT = 0, + TSDB_UDF_CALL_AGG_PROC, TSDB_UDF_CALL_AGG_MERGE, - TSDb_UDF_CALL_AGG_FIN, + TSDB_UDF_CALL_AGG_FIN, TSDB_UDF_CALL_SCALA_PROC, }; @@ -49,14 +50,15 @@ typedef struct SUdfCallRequest { int64_t udfHandle; int8_t callType; - SUdfDataBlock block; + SSDataBlock block; SUdfInterBuf interBuf; SUdfInterBuf interBuf2; - bool initFirst; + int8_t initFirst; } SUdfCallRequest; typedef struct SUdfCallResponse { - SUdfColumnData resultData; + int8_t callType; + SSDataBlock resultData; SUdfInterBuf interBuf; } SUdfCallResponse; @@ -94,10 +96,11 @@ typedef struct SUdfResponse { }; } SUdfResponse; -int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest *pRequest); -int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request); -int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse *pResponse); -int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response); +int32_t encodeUdfRequest(void **buf, const SUdfRequest* request); +void* decodeUdfRequest(const void *buf, SUdfRequest* request); + +int32_t encodeUdfResponse(void **buf, const SUdfResponse *response); +void* decodeUdfResponse(const void* buf, SUdfResponse *response); void freeUdfColumnData(SUdfColumnData *data); void freeUdfColumn(SUdfColumn* col); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 283dfd06bb..c8601c69c8 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -213,507 +213,241 @@ QUEUE gUdfTaskQueue = {0}; QUEUE gUvProcTaskQueue = {0}; -int32_t serializeUdfColumnData(SUdfColumnData* data, char* pBuf) { - char* bufBeg = pBuf; - *(int32_t*)pBuf = data->numOfRows; - pBuf += sizeof(int32_t); - *(bool*)pBuf = data->varLengthColumn; - pBuf += sizeof(bool); +int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { + int32_t len = 0; + len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN); + len += taosEncodeSEpSet(buf, &setup->epSet); + return len; +} - if (!data->varLengthColumn) { - *(int32_t*)pBuf = data->nullBitmapLen; - pBuf += sizeof(int32_t); - memcpy(pBuf, data->nullBitmap, data->nullBitmapLen); - pBuf += data->nullBitmapLen; - *(int32_t*)pBuf = data->dataLen; - pBuf += sizeof(int32_t); - memcpy(pBuf, data->data, data->dataLen); - pBuf += data->dataLen; +void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) { + buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN); + buf = taosDecodeSEpSet((void*)buf, &request->epSet); + return (void*)buf; +} + +int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) { + int32_t len = 0; + len += taosEncodeFixedI32(buf, state->bufLen); + len += taosEncodeBinary(buf, state->buf, state->bufLen); + return len; +} + +void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) { + buf = taosDecodeFixedI32(buf, &state->bufLen); + buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen); + return (void*)buf; +} + +int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) { + int32_t len = 0; + len += taosEncodeFixedI64(buf, call->udfHandle); + len += taosEncodeFixedI8(buf, call->callType); + if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { + len += tEncodeDataBlock(buf, &call->block); + } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) { + len += taosEncodeFixedI8(buf, call->initFirst); + } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) { + len += tEncodeDataBlock(buf, &call->block); + len += encodeUdfInterBuf(buf, &call->interBuf); + } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) { + len += encodeUdfInterBuf(buf, &call->interBuf); + len += encodeUdfInterBuf(buf, &call->interBuf2); + } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) { + len += encodeUdfInterBuf(buf, &call->interBuf); + } + return len; +} + +void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) { + buf = taosDecodeFixedI64(buf, &call->udfHandle); + buf = taosDecodeFixedI8(buf, &call->callType); + switch (call->callType) { + case TSDB_UDF_CALL_SCALA_PROC: + buf = tDecodeDataBlock(buf, &call->block); + break; + case TSDB_UDF_CALL_AGG_INIT: + buf = taosDecodeFixedI8(buf, &call->initFirst); + break; + case TSDB_UDF_CALL_AGG_PROC: + buf = tDecodeDataBlock(buf, &call->block); + buf = decodeUdfInterBuf(buf, &call->interBuf); + break; + case TSDB_UDF_CALL_AGG_MERGE: + buf = decodeUdfInterBuf(buf, &call->interBuf); + buf = decodeUdfInterBuf(buf, &call->interBuf2); + break; + case TSDB_UDF_CALL_AGG_FIN: + buf = decodeUdfInterBuf(buf, &call->interBuf); + break; + } + return (void*)buf; +} + +int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) { + int32_t len = 0; + len += taosEncodeFixedI64(buf, teardown->udfHandle); + return len; +} + +void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown) { + buf = taosDecodeFixedI64(buf, &teardown->udfHandle); + return (void*)buf; +} + +int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) { + int32_t len = 0; + if (buf == NULL) { + len += sizeof(request->msgLen); } else { - *(int32_t*)pBuf = data->varOffsetsLen; - pBuf += sizeof(int32_t); - memcpy(pBuf, data->varOffsets, data->varOffsetsLen); - pBuf += data->varOffsetsLen; - *(int32_t*)pBuf = data->payloadLen; - pBuf += sizeof(int32_t); - memcpy(pBuf, data->payload, data->payloadLen); - pBuf += data->payloadLen; + *(int32_t*)(*buf) = request->msgLen; + *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen)); } - int32_t len = pBuf - bufBeg; - return len; -} - -int32_t deserializeUdfColumnData(SUdfColumnData* pData, char* buf) { - char* bufBeg = buf; - pData->numOfRows = *(int32_t*)buf; - buf += sizeof(int32_t); - pData->varLengthColumn = *(bool*)buf; - buf += sizeof(bool); - - if (!pData->varLengthColumn) { - pData->nullBitmapLen = *(int32_t*)buf; - buf += sizeof(int32_t); - - //TODO: optimize for less memory copy - pData->nullBitmap = taosMemoryMalloc(pData->nullBitmapLen); - memcpy(pData->nullBitmap, buf, pData->nullBitmapLen); - buf += pData->nullBitmapLen; - - pData->dataLen = *(int32_t*)buf; - buf += sizeof(int32_t); - - pData->data = taosMemoryMalloc(pData->dataLen); - memcpy(pData->data, buf, pData->dataLen); - buf += pData->dataLen; - } else { - pData->varOffsetsLen = *(int32_t*)buf; - buf += sizeof(int32_t); - - pData->varOffsets = taosMemoryMalloc(pData->varOffsetsLen); - memcpy(pData->varOffsets, buf, pData->varOffsetsLen); - buf += pData->varOffsetsLen; - - pData->payloadLen = *(int32_t*)buf; - buf += sizeof(int32_t); - - pData->payload = taosMemoryMalloc(pData->payloadLen); - memcpy(pData->payload, buf, pData->payloadLen); - buf += pData->payloadLen; - } - int32_t len = buf - bufBeg; - return len; -} - -int32_t serializeUdfColumnMeta(SUdfColumnMeta *meta, char* pBuf) { - char* bufBeg = pBuf; - memcpy(pBuf, meta, sizeof(SUdfColumnMeta)); - pBuf += sizeof(SUdfColumnMeta); - - int32_t len = pBuf - bufBeg; - return len; -} - -int32_t deserializeUdfColumnMeta(SUdfColumnMeta *pMeta, char* buf) { - char *bufBegin = buf; - memcpy(pMeta, buf, sizeof(SUdfColumnMeta)); - buf += sizeof(SUdfColumnMeta); - - int32_t len = buf - bufBegin; - return len; -} - -int32_t serializeUdfColumn(SUdfColumn *udfCol, char *pBuf) { - char *bufBegin = pBuf; - - int32_t len = serializeUdfColumnMeta(&udfCol->colMeta, pBuf); - pBuf += len; - - len = serializeUdfColumnData(&udfCol->colData, pBuf); - pBuf += len; - - int32_t totalLen = pBuf - bufBegin; - return totalLen; -} - -int32_t deserializeUdfColumn(SUdfColumn *pUdfCol, char *buf) { - char *bufBegin = buf; - - int32_t len = deserializeUdfColumnMeta(&pUdfCol->colMeta, buf); - buf += len; - - len = deserializeUdfColumnData(&pUdfCol->colData, buf); - buf += len; - - int32_t totalLen = buf - bufBegin; - return totalLen; -} - -int32_t serializeUdfDataBlock(SUdfDataBlock *block, char *pBuf) { - char *bufBegin = pBuf; - - *(int32_t*)pBuf = block->numOfRows; - pBuf += sizeof(int32_t); - - *(int32_t*)pBuf = block->numOfCols; - pBuf += sizeof(int32_t); - - for (int32_t i = 0; i < block->numOfCols; ++i) { - SUdfColumn* col = block->udfCols[i]; - int32_t l = serializeUdfColumn(col, pBuf); - pBuf += l; - } - - int32_t totalLen = pBuf - bufBegin; - return totalLen; -} - -int32_t deserializeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) { - char *bufBegin = buf; - - pBlock->numOfRows = *(int32_t*)buf; - buf += sizeof(int32_t); - - pBlock->numOfCols = *(int32_t*)buf; - buf += sizeof(int32_t); - - pBlock->udfCols = taosMemoryMalloc(sizeof(SUdfColumn*) * pBlock->numOfCols); - for (int32_t i = 0; i < pBlock->numOfCols; ++i) { - pBlock->udfCols[i] = taosMemoryMalloc(sizeof(SUdfColumn)); - int32_t l = deserializeUdfColumn(pBlock->udfCols[i], buf); - buf += l; - } - - int32_t totalLen = buf - bufBegin; - return totalLen; -} - -int32_t serializeUdfInterBuf(SUdfInterBuf *state, char *pBuf) { - char *bufBegin = pBuf; - - *(int32_t*)pBuf = state->bufLen; - pBuf += sizeof(int32_t); - memcpy(pBuf, state->buf, state->bufLen); - pBuf += state->bufLen; - - return pBuf - bufBegin; -} - -int32_t deserializeUdfInterBuf(SUdfInterBuf *pState, char *buf) { - char* bufBegin = buf; - pState->bufLen = *(int32_t*)buf; - buf += sizeof(int32_t); - - pState->buf = taosMemoryMalloc(pState->bufLen); - memcpy(pState->buf, buf, pState->bufLen); - buf += pState->bufLen; - return buf - bufBegin; -} - -int32_t serializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) { - char *bufBegin = buf; - - memcpy(buf, setup->udfName, TSDB_FUNC_NAME_LEN); - buf += TSDB_FUNC_NAME_LEN; - - memcpy(buf, &setup->epSet, sizeof(SEpSet)); - buf += sizeof(SEpSet); - - return buf - bufBegin; -}; - -int32_t deserializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) { - char* bufBegin = buf; - - memcpy(setup->udfName, buf, TSDB_FUNC_NAME_LEN); - buf += TSDB_FUNC_NAME_LEN; - - memcpy(&setup->epSet, buf, sizeof(SEpSet)); - buf += sizeof(SEpSet); - return buf - bufBegin; -} - -int32_t serializeUdfTeardownRequest(SUdfTeardownRequest *teardown, char *buf) { - char* bufBegin = buf; - *(int64_t *) buf = teardown->udfHandle; - buf += sizeof(int64_t); - - return buf - bufBegin; -} - -int32_t deserializeUdfTeardownRequest(SUdfTeardownRequest *teardown, char *buf) { - char *bufBegin = buf; - teardown->udfHandle = *(int64_t *) buf; - buf += sizeof(int64_t); - return buf - bufBegin; -} - -int32_t serializeUdfSetupResponse(SUdfSetupResponse *setupRsp, char *buf) { - char *bufBegin = buf; - - *(int64_t *) buf = setupRsp->udfHandle; - buf += sizeof(int64_t); - - return buf-bufBegin; -} - -int32_t deserializeUdfSetupResponse(SUdfSetupResponse *setupRsp, char *buf) { - char *bufBegin = buf; - setupRsp->udfHandle = *(int64_t *) buf; - buf += sizeof(int64_t); - return buf-bufBegin; -} - -int32_t serializeUdfTeardownResponse(SUdfTeardownResponse *teardownRsp, char *buf) { - return 0; -} - -int32_t deserializeUdfTeardownResponse(SUdfTeardownResponse *teardownRsp, char *buf) { - return 0; -} - -int32_t serializeUdfCallRequest(SUdfCallRequest *call, char *buf) { - char* bufBegin = buf; - *(int64_t *) buf = call->udfHandle; - buf += sizeof(int64_t); - *(int8_t *) buf = call->callType; - buf += sizeof(int8_t); - int32_t l = 0; - l = serializeUdfDataBlock(&call->block, buf); - buf += l; - l = serializeUdfInterBuf(&call->interBuf, buf); - buf += l; - l = serializeUdfInterBuf(&call->interBuf2, buf); - buf += l; - - *(bool*)buf = call->initFirst; - buf += sizeof(bool); - - return buf - bufBegin; -} - -int32_t deserializeUdfCallRequest(SUdfCallRequest *call, char *buf) { - char* bufBegin = buf; - call->udfHandle = *(int64_t *) buf; - buf += sizeof(int64_t); - call->callType = *(int8_t *) buf; - buf += sizeof(int8_t); - int32_t l = 0; - l = deserializeUdfDataBlock(&call->block, buf); - buf += l; - l = deserializeUdfInterBuf(&call->interBuf, buf); - buf += l; - call->initFirst = *(bool*)buf; - buf += sizeof(bool); - return buf - bufBegin; -} - -int32_t serializeUdfCallResponse(SUdfCallResponse *callRsp, char * buf) { - char *bufBegin = buf; - int32_t l = 0; - l = serializeUdfColumnData(&callRsp->resultData, buf); - buf += l; - l = serializeUdfInterBuf(&callRsp->interBuf, buf); - buf += l; - - return buf - bufBegin; -} - -int32_t deserializeUdfCallResponse(SUdfCallResponse *callRsp, char * buf) { - char *bufBegin = buf; - int32_t l = 0; - l =deserializeUdfColumnData(&callRsp->resultData, buf); - buf += l; - l = deserializeUdfInterBuf(&callRsp->interBuf, buf); - buf += l; - return buf - bufBegin; -} - -int32_t serializeUdfRequest(SUdfRequest *request, char *buf) { - char* bufBegin = buf; - //skip msglen first - buf += sizeof(int32_t); - - *(int64_t *) buf = request->seqNum; - buf += sizeof(int64_t); - *(int8_t *) buf = request->type; - buf += sizeof(int8_t); - - int32_t l = 0; - if (request->type == UDF_TASK_SETUP) { - l = serializeUdfSetupRequest(&request->setup, buf); - buf += l; - } else if (request->type == UDF_TASK_CALL) { - l = serializeUdfCallRequest(&request->call, buf); - buf += l; - } else if (request->type == UDF_TASK_TEARDOWN){ - l = serializeUdfTeardownRequest(&request->teardown, buf); - buf += l; - } - *(int32_t*)bufBegin = buf - bufBegin; - return buf - bufBegin; -} - -int32_t deserializeUdfRequest(SUdfRequest *request, char *buf) { - char* bufBegin = buf; - request->msgLen = *(int32_t *) (buf); - buf += sizeof(int32_t); - request->seqNum = *(int64_t *) (buf); - buf += sizeof(int64_t); - request->type = *(int8_t *) (buf); - buf += sizeof(int8_t); - - int32_t l = 0; + len += taosEncodeFixedI64(buf, request->seqNum); + len += taosEncodeFixedI8(buf, request->type); if (request->type == UDF_TASK_SETUP) { - l = deserializeUdfSetupRequest(&request->setup, buf); - buf += l; + len += encodeUdfSetupRequest(buf, &request->setup); } else if (request->type == UDF_TASK_CALL) { - l = deserializeUdfCallRequest(&request->call, buf); - buf += l; - } else if (request->type == UDF_TASK_TEARDOWN){ - l = deserializeUdfTeardownRequest(&request->teardown, buf); - buf += l; + len += encodeUdfCallRequest(buf, &request->call); + } else if (request->type == UDF_TASK_TEARDOWN) { + len += encodeUdfTeardownRequest(buf, &request->teardown); } - int32_t totalLen = buf-bufBegin; - if (totalLen != request->msgLen) { - debugPrint("decoding request error"); - return -1; - } - return buf - bufBegin; + return len; } -int32_t serializeUdfResponse(SUdfResponse *response, char *buf) { - char* bufBegin = buf; - //skip msgLen - buf += sizeof(int32_t); +void* decodeUdfRequest(const void* buf, SUdfRequest* request) { + request->msgLen = *(int32_t*)(buf); + POINTER_SHIFT(buf, sizeof(request->msgLen)); - *(int64_t *) buf = response->seqNum; - buf += sizeof(int64_t); - *(int8_t *) buf = response->type; - buf += sizeof(int8_t); - *(int32_t *) buf = response->code; - buf += sizeof(int32_t); - int32_t l = 0; - switch (response->type) { - case UDF_TASK_SETUP: { - l = serializeUdfSetupResponse(&response->setupRsp, buf); - buf += l; - break; - } - case UDF_TASK_CALL: { - l = serializeUdfCallResponse(&response->callRsp, buf); - buf += l; - break; - } - case UDF_TASK_TEARDOWN: { - l = serializeUdfTeardownResponse(&response->teardownRsp, buf); - buf += l; - } + buf = taosDecodeFixedI64(buf, &request->seqNum); + buf = taosDecodeFixedI8(buf, &request->type); + + if (request->type == UDF_TASK_SETUP) { + buf = decodeUdfSetupRequest(buf, &request->setup); + } else if (request->type == UDF_TASK_CALL) { + buf = decodeUdfCallRequest(buf, &request->call); + } else if (request->type == UDF_TASK_TEARDOWN) { + buf = decodeUdfTeardownRequest(buf, &request->teardown); } - - *(int32_t*)bufBegin = buf - bufBegin; - return buf - bufBegin; + return (void*)buf; } -int32_t deserializeUdfResponse(SUdfResponse *rsp, char *buf) { - char* bufBegin = buf; - rsp->msgLen = *(int32_t *) buf; - buf += sizeof(int32_t); - rsp->seqNum = *(int64_t *) buf; - buf += sizeof(int64_t); - rsp->type = *(int8_t *) buf; - buf += sizeof(int8_t); - rsp->code = *(int32_t *) buf; - buf += sizeof(int32_t); - int32_t l = 0; +int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) { + int32_t len = 0; + len += taosEncodeFixedI64(buf, setupRsp->udfHandle); + return len; +} + +void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) { + buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle); + return (void*)buf; +} + +int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) { + int32_t len = 0; + len += taosEncodeFixedI8(buf, callRsp->callType); + switch (callRsp->callType) { + case TSDB_UDF_CALL_SCALA_PROC: + len += tEncodeDataBlock(buf, &callRsp->resultData); + break; + case TSDB_UDF_CALL_AGG_INIT: + len += encodeUdfInterBuf(buf, &callRsp->interBuf); + break; + case TSDB_UDF_CALL_AGG_PROC: + len += encodeUdfInterBuf(buf, &callRsp->interBuf); + break; + case TSDB_UDF_CALL_AGG_MERGE: + len += encodeUdfInterBuf(buf, &callRsp->interBuf); + break; + case TSDB_UDF_CALL_AGG_FIN: + len += tEncodeDataBlock(buf, &callRsp->resultData); + break; + } + return len; +} + +void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) { + buf = taosDecodeFixedI8(buf, &callRsp->callType); + switch (callRsp->callType) { + case TSDB_UDF_CALL_SCALA_PROC: + buf = tDecodeDataBlock(buf, &callRsp->resultData); + break; + case TSDB_UDF_CALL_AGG_INIT: + buf = decodeUdfInterBuf(buf, &callRsp->interBuf); + break; + case TSDB_UDF_CALL_AGG_PROC: + buf = decodeUdfInterBuf(buf, &callRsp->interBuf); + break; + case TSDB_UDF_CALL_AGG_MERGE: + buf = decodeUdfInterBuf(buf, &callRsp->interBuf); + break; + case TSDB_UDF_CALL_AGG_FIN: + buf = tDecodeDataBlock(buf, &callRsp->resultData); + break; + } + return (void*)buf; +} + +int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp) { + return 0; +} + +void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse) { + return (void*)buf; +} + +int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) { + int32_t len = 0; + if (buf == NULL) { + len += sizeof(rsp->msgLen); + } else { + *(int32_t*)(*buf) = rsp->msgLen; + *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen)); + } + + len += taosEncodeFixedI64(buf, rsp->seqNum); + len += taosEncodeFixedI8(buf, rsp->type); + len += taosEncodeFixedI32(buf, rsp->code); + switch (rsp->type) { - case UDF_TASK_SETUP: { - l = deserializeUdfSetupResponse(&rsp->setupRsp, buf); - buf += l; + case UDF_TASK_SETUP: + len += encodeUdfSetupResponse(buf, &rsp->setupRsp); break; - } - case UDF_TASK_CALL: { - l = deserializeUdfCallResponse(&rsp->callRsp, buf); - buf += l; + case UDF_TASK_CALL: + len += encodeUdfCallResponse(buf, &rsp->callRsp); + break; + case UDF_TASK_TEARDOWN: + len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp); + break; + default: + //TODO: log error break; - } - case UDF_TASK_TEARDOWN: { - l = deserializeUdfTeardownResponse(&rsp->teardownRsp, buf); - buf += l; - } } - int32_t total = buf - bufBegin; - if (total != rsp->msgLen) { - debugPrint("decode response error"); - return -1; - } - return buf - bufBegin; + return len; } -int32_t estimateUdfRequestLen(SUdfRequest *request) { - // a larger estimated is generated - int32_t size = sizeof(SUdfRequest); - if (request->type == UDF_TASK_CALL) { - size += request->call.block.numOfCols * sizeof(SUdfColumn*); - for (int32_t i = 0; i < request->call.block.numOfCols; ++i) { - size += sizeof(SUdfColumn); - SUdfColumn* col = request->call.block.udfCols[i]; - if (col->colData.varLengthColumn) { - size += col->colData.varOffsetsLen; - size += col->colData.payloadLen; - } else { - size += col->colData.nullBitmapLen; - size += col->colData.dataLen; - } - } - size += request->call.interBuf.bufLen; +void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { + rsp->msgLen = *(int32_t*)(buf); + POINTER_SHIFT(buf, sizeof(rsp->msgLen)); + buf = taosDecodeFixedI64(buf, &rsp->seqNum); + buf = taosDecodeFixedI8(buf, &rsp->type); + buf = taosDecodeFixedI32(buf, &rsp->code); + + switch (rsp->type) { + case UDF_TASK_SETUP: + buf = decodeUdfSetupResponse(buf, &rsp->setupRsp); + break; + case UDF_TASK_CALL: + buf = decodeUdfCallResponse(buf, &rsp->callRsp); + break; + case UDF_TASK_TEARDOWN: + buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp); + break; + default: + //TODO: log error + break; } - return size; -} - -int32_t estimateUdfResponseLen(SUdfResponse *response) { - int32_t size = sizeof(SUdfResponse); - if (response->type == UDF_TASK_CALL) { - size += response->callRsp.interBuf.bufLen; - SUdfColumnData *resultData = &response->callRsp.resultData; - if (!resultData->varLengthColumn) { - size += resultData->nullBitmapLen; - size += resultData->dataLen; - } else { - size += resultData->varOffsetsLen; - size += resultData->payloadLen; - } - } - - return size; -} - -int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { - debugPrint("%s", "encoding request"); - - int len = estimateUdfRequestLen(request); - - char *bufBegin = taosMemoryMalloc(len); - char *buf = bufBegin; - serializeUdfRequest(request, buf); - *pBuf = bufBegin; - *pBufLen = request->msgLen; - debugPrint("\tLen: estimate: %d, actual:%d", len, *pBufLen); - return 0; -} - -int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest *pRequest) { - debugPrint("%s", "decoding request"); - if (*(int32_t *) bufMsg != bufLen) { - debugPrint("%s", "decoding request error"); - return -1; - } - char *buf = bufMsg; - deserializeUdfRequest(pRequest, buf); - return 0; -} - -int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) { - debugPrint("%s", "encoding response"); - int32_t len = estimateUdfResponseLen(response); - - char *bufBegin = taosMemoryMalloc(len); - char *buf = bufBegin; - serializeUdfResponse(response, buf); - *pBuf = bufBegin; - *pBufLen = response->msgLen; - return 0; -} - -int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse *rsp) { - debugPrint("%s", "decoding response"); - - if (*(int32_t *) bufMsg != bufLen) { - debugPrint("%s", "can not decode response"); - return -1; - } - char *buf = bufMsg; - deserializeUdfResponse(rsp, buf); - return 0; + return (void*)buf; } void freeUdfColumnData(SUdfColumnData *data) { @@ -770,7 +504,8 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->rspBuf.base != NULL) { SUdfResponse rsp; - decodeResponse(uvTask->rspBuf.base, uvTask->rspBuf.len, &rsp); + void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp); + assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base)); task->errCode = rsp.code; switch (task->type) { @@ -973,9 +708,10 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN } else { //TODO log and return error } - char *buf = NULL; - int32_t bufLen = 0; - encodeRequest(&buf, &bufLen, &request); + int32_t bufLen = encodeUdfRequest(NULL, &request); + request.msgLen = bufLen; + void *buf = taosMemoryMalloc(bufLen); + encodeUdfRequest(&buf, &request); uvTask->reqBuf = uv_buf_init(buf, bufLen); uvTask->seqNum = request.seqNum; } else if (uvTaskType == UV_TASK_DISCONNECT) { @@ -1226,8 +962,8 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { return err; } -int32_t callUdf(UdfHandle handle, int8_t callType, SUdfDataBlock *input, SUdfInterBuf *state, - SUdfColumnData* output, SUdfInterBuf *newState, bool initFirst) { +int32_t callUdf(UdfHandle handle, int8_t callType, int8_t initFirst, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, + SSDataBlock* output, SUdfInterBuf *newState) { debugPrint("%s", "client call udf"); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); @@ -1237,14 +973,21 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SUdfDataBlock *input, SUdfInt SUdfCallRequest *req = &task->_call.req; switch (callType) { + case TSDB_UDF_CALL_AGG_INIT: { + req->initFirst = 1; + break; + } case TSDB_UDF_CALL_AGG_PROC: { req->block = *input; req->interBuf = *state; - req->initFirst = initFirst; break; } - - case TSDb_UDF_CALL_AGG_FIN: { + case TSDB_UDF_CALL_AGG_MERGE: { + req->interBuf = *state; + req->interBuf2 = *state2; + break; + } + case TSDB_UDF_CALL_AGG_FIN: { req->interBuf = *state; break; } @@ -1258,12 +1001,19 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SUdfDataBlock *input, SUdfInt SUdfCallResponse *rsp = &task->_call.rsp; switch (callType) { + case TSDB_UDF_CALL_AGG_INIT: { + *newState = rsp->interBuf; + break; + } case TSDB_UDF_CALL_AGG_PROC: { *newState = rsp->interBuf; break; } - - case TSDb_UDF_CALL_AGG_FIN: { + case TSDB_UDF_CALL_AGG_MERGE: { + *newState = rsp->interBuf; + break; + } + case TSDB_UDF_CALL_AGG_FIN: { *output = rsp->resultData; break; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index a1996133e7..76c731a473 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -62,7 +62,7 @@ typedef struct SUdfHandle { void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data); SUdfRequest request = {0}; - decodeRequest(uvUdf->input.base, uvUdf->input.len, &request); + decodeUdfRequest(uvUdf->input.base, &request); switch (request.type) { case UDF_TASK_SETUP: { @@ -94,9 +94,10 @@ void udfdProcessRequest(uv_work_t *req) { rsp.type = request.type; rsp.code = 0; rsp.setupRsp.udfHandle = (int64_t) (handle); - char *buf; - int32_t len; - encodeResponse(&buf, &len, &rsp); + int32_t len = encodeUdfResponse(NULL, &rsp); + rsp.msgLen = len; + void *buf = taosMemoryMalloc(len); + encodeUdfResponse(&buf, &rsp); uvUdf->output = uv_buf_init(buf, len); @@ -110,7 +111,8 @@ void udfdProcessRequest(uv_work_t *req) { SUdfHandle *handle = (SUdfHandle *) (call->udfHandle); SUdf *udf = handle->udf; - SUdfDataBlock input = call->block; + SUdfDataBlock input = {0}; + //TODO: convertSDataBlockToUdfDataBlock(call->block, &input); SUdfColumnData output; //TODO: call different functions according to call type, for now just calar if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { @@ -124,12 +126,13 @@ void udfdProcessRequest(uv_work_t *req) { rsp->type = request.type; rsp->code = 0; SUdfCallResponse *subRsp = &rsp->callRsp; - subRsp->resultData = output; + //TODO: convertSUdfColumnDataToSSDataBlock(output, &subRsp->resultData); } - char *buf; - int32_t len; - encodeResponse(&buf, &len, rsp); + int32_t len = encodeUdfResponse(NULL, rsp); + rsp->msgLen = len; + void *buf = taosMemoryMalloc(len); + encodeUdfResponse(&buf, rsp); uvUdf->output = uv_buf_init(buf, len); //TODO: free @@ -158,9 +161,10 @@ void udfdProcessRequest(uv_work_t *req) { rsp->type = request.type; rsp->code = 0; SUdfTeardownResponse *subRsp = &response.teardownRsp; - char *buf; - int32_t len; - encodeResponse(&buf, &len, rsp); + int32_t len = encodeUdfResponse(NULL, rsp); + void *buf = taosMemoryMalloc(len); + rsp->msgLen = len; + encodeUdfResponse(&buf, rsp); uvUdf->output = uv_buf_init(buf, len); taosMemoryFree(uvUdf->input.base);