From 0776427f45724b01099a23e418bf5e53d105d27b Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 19 Apr 2022 16:51:02 +0800 Subject: [PATCH] runUdf passed --- source/libs/function/inc/tudf.h | 24 +++++---- source/libs/function/src/tudf.c | 82 +++++++++++++++++------------- source/libs/function/src/udfd.c | 8 +-- source/libs/function/test/runUdf.c | 6 ++- source/libs/function/test/udf1.c | 55 +++++++++++--------- 5 files changed, 103 insertions(+), 72 deletions(-) diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index 74aab09da2..8ec02c777f 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -68,17 +68,19 @@ typedef struct SUdfColumnData { int32_t numOfRows; bool varLengthColumn; union { - int32_t nullBitmapLen; - char* nullBitmap; - int32_t dataLen; - char* data; - }; + struct { + int32_t nullBitmapLen; + char *nullBitmap; + int32_t dataLen; + char *data; + } fixLenCol; - union { - int32_t varOffsetsLen; - char* varOffsets; - int32_t payloadLen; - char* payload; + struct { + int32_t varOffsetsLen; + char *varOffsets; + int32_t payloadLen; + char *payload; + } varLenCol; }; } SUdfColumnData; @@ -136,7 +138,7 @@ typedef int32_t (*TUdfTeardownFunc)(); //typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data); //typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data); -typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* columnData); +typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index c5542424f0..c41447b584 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -361,7 +361,7 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) { len += encodeUdfInterBuf(buf, &callRsp->resultBuf); break; case TSDB_UDF_CALL_AGG_FIN: - len += tEncodeDataBlock(buf, &callRsp->resultData); + len += encodeUdfInterBuf(buf, &callRsp->resultBuf); break; } return len; @@ -383,7 +383,7 @@ void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) { buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; case TSDB_UDF_CALL_AGG_FIN: - buf = tDecodeDataBlock(buf, &callRsp->resultData); + buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; } return (void*)buf; @@ -406,6 +406,13 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) { *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen)); } + if (buf == NULL) { + len += sizeof(rsp->seqNum); + } else { + *(int64_t*)(*buf) = rsp->seqNum; + *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum)); + } + len += taosEncodeFixedI64(buf, rsp->seqNum); len += taosEncodeFixedI8(buf, rsp->type); len += taosEncodeFixedI32(buf, rsp->code); @@ -430,6 +437,8 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) { void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { rsp->msgLen = *(int32_t*)(buf); buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen)); + rsp->seqNum = *(int64_t*)(buf); + buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum)); buf = taosDecodeFixedI64(buf, &rsp->seqNum); buf = taosDecodeFixedI8(buf, &rsp->type); buf = taosDecodeFixedI32(buf, &rsp->code); @@ -453,15 +462,15 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { void freeUdfColumnData(SUdfColumnData *data) { if (data->varLengthColumn) { - taosMemoryFree(data->varOffsets); - data->varOffsets = NULL; - taosMemoryFree(data->payload); - data->payload = NULL; + taosMemoryFree(data->varLenCol.varOffsets); + data->varLenCol.varOffsets = NULL; + taosMemoryFree(data->varLenCol.payload); + data->varLenCol.payload = NULL; } else { - taosMemoryFree(data->nullBitmap); - data->nullBitmap = NULL; - taosMemoryFree(data->data); - data->data = NULL; + taosMemoryFree(data->fixLenCol.nullBitmap); + data->fixLenCol.nullBitmap = NULL; + taosMemoryFree(data->fixLenCol.data); + data->fixLenCol.data = NULL; } } @@ -488,9 +497,9 @@ void freeUdfInterBuf(SUdfInterBuf *buf) { int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) { udfBlock->numOfRows = block->info.rows; udfBlock->numOfCols = block->info.numOfCols; - udfBlock->udfCols = taosMemoryMalloc(sizeof(SUdfColumn*) * udfBlock->numOfCols); + udfBlock->udfCols = taosMemoryCalloc(udfBlock->numOfCols, sizeof(SUdfColumn*)); for (int32_t i = 0; i < udfBlock->numOfCols; ++i) { - udfBlock->udfCols[i] = taosMemoryMalloc(sizeof(SUdfColumn)); + udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn)); SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i); SUdfColumn *udfCol = udfBlock->udfCols[i]; udfCol->colMeta.type = col->info.type; @@ -500,19 +509,23 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colData.numOfRows = udfBlock->numOfRows; udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type); if (udfCol->colData.varLengthColumn) { - udfCol->colData.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; - udfCol->colData.varOffsets = taosMemoryMalloc(udfCol->colData.varOffsetsLen); - memcpy(udfCol->colData.varOffsets, col->varmeta.offset, udfCol->colData.varOffsetsLen); - udfCol->colData.payloadLen = colDataGetLength(col, udfBlock->numOfRows); - udfCol->colData.payload = taosMemoryMalloc(udfCol->colData.payloadLen); - memcpy(udfCol->colData.payload, col->pData, udfCol->colData.payloadLen); + udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; + udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); + memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen); + udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows); + udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen); + memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen); } else { - udfCol->colData.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows); - udfCol->colData.nullBitmap = taosMemoryMalloc(udfCol->colData.nullBitmapLen); - memcpy(udfCol->colData.nullBitmap, col->nullbitmap, udfCol->colData.nullBitmapLen); - udfCol->colData.dataLen = colDataGetLength(col, udfBlock->numOfRows); - udfCol->colData.data = taosMemoryMalloc(udfCol->colData.dataLen); - memcpy(udfCol->colData.data, col->pData, udfCol->colData.dataLen); + udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows); + int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen; + udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen); + char* bitmap = udfCol->colData.fixLenCol.nullBitmap; + memcpy(bitmap, col->nullbitmap, bitmapLen); + udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows); + int32_t dataLen = udfCol->colData.fixLenCol.dataLen; + udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen); + char* data = udfCol->colData.fixLenCol.data; + memcpy(data, col->pData, dataLen); } } return 0; @@ -534,15 +547,15 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { SUdfColumnData *data = &udfCol->colData; if (!IS_VAR_DATA_TYPE(meta->type)) { - col->nullbitmap = taosMemoryMalloc(data->nullBitmapLen); - memcpy(col->nullbitmap, data->nullBitmap, data->nullBitmapLen); - col->pData = taosMemoryMalloc(data->dataLen); - memcpy(col->pData, data->payload, data->dataLen); + col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen); + memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen); + col->pData = taosMemoryMalloc(data->fixLenCol.dataLen); + memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen); } else { - col->varmeta.offset = taosMemoryMalloc(data->varOffsetsLen); - memcpy(col->varmeta.offset, data->varOffsets, data->varOffsetsLen); - col->pData = taosMemoryMalloc(data->payloadLen); - memcpy(col->pData, data->payload, data->payloadLen); + col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen); + memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen); + col->pData = taosMemoryMalloc(data->varLenCol.payloadLen); + memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen); } return 0; } @@ -932,7 +945,7 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { gUdfcState = UDFC_STATE_RESTARTING; //TODO: asynchronous without blocking. how to do it cleanUpUvTasks(); - //startUdfd(); + startUdfd(); } } @@ -967,7 +980,7 @@ void constructUdfService(void *argsThread) { uv_loop_init(&gUdfdLoop); //TODO spawn error - //startUdfd(); + startUdfd(); uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb); uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); @@ -1053,6 +1066,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter SUdfCallRequest *req = &task->_call.req; req->udfHandle = task->session->severHandle; + req->callType = callType; switch (callType) { case TSDB_UDF_CALL_AGG_INIT: { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 50f8bdbd93..a02c94c109 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -84,9 +84,10 @@ void udfdProcessRequest(uv_work_t *req) { //TODO error, multi-thread, same udf, lock it //TODO find all functions normal, init, destroy, normal, merge, finalize uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc)); - char freeFuncName[TSDB_FUNC_NAME_LEN + 5]; + char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *freeSuffix = "_free"; strncpy(freeFuncName, normalFuncName, strlen(normalFuncName)); - strcat(freeFuncName, "_free"); + strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle)); @@ -118,7 +119,7 @@ void udfdProcessRequest(uv_work_t *req) { SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); - SUdfColumn output; + SUdfColumn output = {0}; //TODO: call different functions according to call type, for now just calar if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { udf->scalarProcFunc(input, &output); @@ -131,6 +132,7 @@ void udfdProcessRequest(uv_work_t *req) { rsp->type = request.type; rsp->code = 0; SUdfCallResponse *subRsp = &rsp->callRsp; + subRsp->callType = call->callType; convertUdfColumnToDataBlock(&output, &subRsp->resultData); } diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index a03e006f7f..28dc6bb99a 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -30,7 +30,7 @@ int main(int argc, char *argv[]) { pBlock->info.numOfCols = 1; pBlock->info.rows = 4; char data[16] = {0}; - char bitmap[1] = {0}; + char bitmap[4] = {0}; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData colInfo = {0}; colInfo.info.type = TSDB_DATA_TYPE_INT; @@ -47,6 +47,10 @@ int main(int argc, char *argv[]) { SSDataBlock output = {0}; callUdfScalaProcess(handle, pBlock, &output); + SColumnInfoData *col = taosArrayGet(output.pDataBlock, 0); + for (int32_t i = 0; i < output.info.rows; ++i) { + fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t))); + } teardownUdf(handle); stopUdfService(); diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index 31e7a36e9c..94cab9fee9 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -17,44 +17,53 @@ int32_t udf1_teardown() { return 0; } -int32_t udf1(SUdfDataBlock block, SUdfColumnData *resultData) { - +int32_t udf1(SUdfDataBlock block, SUdfColumn *resultCol) { + SUdfColumnData *resultData = &resultCol->colData; resultData->numOfRows = block.numOfRows; SUdfColumnData *srcData = &block.udfCols[0]->colData; resultData->varLengthColumn = srcData->varLengthColumn; if (resultData->varLengthColumn) { - resultData->varOffsetsLen = srcData->varOffsetsLen; - resultData->varOffsets = malloc(resultData->varOffsetsLen); - memcpy(resultData->varOffsets, srcData->varOffsets, srcData->varOffsetsLen); + resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen; + resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen); + memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen); - resultData->payloadLen = srcData->payloadLen; - resultData->payload = malloc(resultData->payloadLen); - memcpy(resultData->payload, srcData->payload, srcData->payloadLen); + resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen; + resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen); + memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen); } else { - resultData->nullBitmapLen = srcData->nullBitmapLen; - resultData->nullBitmap = malloc(resultData->nullBitmapLen); - memcpy(resultData->nullBitmap, srcData->nullBitmap, srcData->nullBitmapLen); + resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen; + resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen); + memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen); - resultData->dataLen = srcData->dataLen; - resultData->data = malloc(resultData->dataLen); - memcpy(resultData->data, srcData->data, srcData->dataLen); + resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen; + resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen); + memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen); + for (int32_t i = 0; i < resultData->numOfRows; ++i) { + *(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88; + } } + SUdfColumnMeta *meta = &resultCol->colMeta; + meta->bytes = 4; + meta->type = TSDB_DATA_TYPE_INT; + meta->scale = 0; + meta->precision = 0; return 0; } -int32_t udf1_free(SUdfColumnData *data) { +int32_t udf1_free(SUdfColumn *col) { + SUdfColumnData *data = &col->colData; if (data->varLengthColumn) { - free(data->varOffsets); - data->varOffsets = NULL; - free(data->payload); - data->payload = NULL; + free(data->varLenCol.varOffsets); + data->varLenCol.varOffsets = NULL; + free(data->varLenCol.payload); + data->varLenCol.payload = NULL; } else { - free(data->nullBitmap); - data->nullBitmap = NULL; - free(data->data); - data->data = NULL; + free(data->fixLenCol.nullBitmap); + data->fixLenCol.nullBitmap = NULL; + free(data->fixLenCol.data); + data->fixLenCol.data = NULL; } return 0; } \ No newline at end of file