befor debugging teardown

This commit is contained in:
slzhou 2022-04-19 12:00:59 +08:00
parent eae11bf8f0
commit 71983725c3
4 changed files with 34 additions and 20 deletions

View File

@ -136,7 +136,7 @@ typedef int32_t (*TUdfTeardownFunc)();
//typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data); //typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data);
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data); //typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
typedef int32_t (*TUdfFreeUdfColumnDataFunc)(SUdfColumn* columnData); typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* columnData);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf);

View File

@ -318,7 +318,7 @@ int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) {
void* decodeUdfRequest(const void* buf, SUdfRequest* request) { void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
request->msgLen = *(int32_t*)(buf); request->msgLen = *(int32_t*)(buf);
POINTER_SHIFT(buf, sizeof(request->msgLen)); buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
buf = taosDecodeFixedI64(buf, &request->seqNum); buf = taosDecodeFixedI64(buf, &request->seqNum);
buf = taosDecodeFixedI8(buf, &request->type); buf = taosDecodeFixedI8(buf, &request->type);
@ -429,7 +429,7 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
rsp->msgLen = *(int32_t*)(buf); rsp->msgLen = *(int32_t*)(buf);
POINTER_SHIFT(buf, sizeof(rsp->msgLen)); buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
buf = taosDecodeFixedI64(buf, &rsp->seqNum); buf = taosDecodeFixedI64(buf, &rsp->seqNum);
buf = taosDecodeFixedI8(buf, &rsp->type); buf = taosDecodeFixedI8(buf, &rsp->type);
buf = taosDecodeFixedI32(buf, &rsp->code); buf = taosDecodeFixedI32(buf, &rsp->code);
@ -786,9 +786,10 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
} }
int32_t bufLen = encodeUdfRequest(NULL, &request); int32_t bufLen = encodeUdfRequest(NULL, &request);
request.msgLen = bufLen; request.msgLen = bufLen;
void *buf = taosMemoryMalloc(bufLen); void *bufBegin = taosMemoryMalloc(bufLen);
void *buf = bufBegin;
encodeUdfRequest(&buf, &request); encodeUdfRequest(&buf, &request);
uvTask->reqBuf = uv_buf_init(buf, bufLen); uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
uvTask->seqNum = request.seqNum; uvTask->seqNum = request.seqNum;
} else if (uvTaskType == UV_TASK_DISCONNECT) { } else if (uvTaskType == UV_TASK_DISCONNECT) {
uvTask->pipe = task->session->udfSvcPipe; uvTask->pipe = task->session->udfSvcPipe;
@ -931,7 +932,7 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
gUdfcState = UDFC_STATE_RESTARTING; gUdfcState = UDFC_STATE_RESTARTING;
//TODO: asynchronous without blocking. how to do it //TODO: asynchronous without blocking. how to do it
cleanUpUvTasks(); cleanUpUvTasks();
startUdfd(); //startUdfd();
} }
} }
@ -966,7 +967,7 @@ void constructUdfService(void *argsThread) {
uv_loop_init(&gUdfdLoop); uv_loop_init(&gUdfdLoop);
//TODO spawn error //TODO spawn error
startUdfd(); //startUdfd();
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb); uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
@ -1009,7 +1010,8 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
udfcGetUvTaskResponseResult(task, uvTask); udfcGetUvTaskResponseResult(task, uvTask);
if (uvTaskType == UV_TASK_CONNECT) { if (uvTaskType == UV_TASK_CONNECT) {
task->session->udfSvcPipe = uvTask->pipe; task->session->udfSvcPipe = uvTask->pipe;
} taosMemoryFree(uvTask); }
taosMemoryFree(uvTask);
uvTask = NULL; uvTask = NULL;
return task->errCode; return task->errCode;
} }
@ -1050,6 +1052,8 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter
task->type = UDF_TASK_CALL; task->type = UDF_TASK_CALL;
SUdfCallRequest *req = &task->_call.req; SUdfCallRequest *req = &task->_call.req;
req->udfHandle = task->session->severHandle;
switch (callType) { switch (callType) {
case TSDB_UDF_CALL_AGG_INIT: { case TSDB_UDF_CALL_AGG_INIT: {
req->initFirst = 1; req->initFirst = 1;

View File

@ -49,7 +49,7 @@ typedef struct SUdf {
uv_lib_t lib; uv_lib_t lib;
TUdfScalarProcFunc scalarProcFunc; TUdfScalarProcFunc scalarProcFunc;
TUdfFreeUdfColumnDataFunc freeUdfColumnData; TUdfFreeUdfColumnFunc freeUdfColumn;
} SUdf; } SUdf;
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix //TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
@ -72,18 +72,22 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfSetupRequest *setup = &request.setup; SUdfSetupRequest *setup = &request.setup;
strcpy(udf->name, setup->udfName); strcpy(udf->name, setup->udfName);
//TODO: retrive udf info from mnode //TODO: retrive udf info from mnode
char* path = "udf1.so"; char* path = "libudf1.so";
int err = uv_dlopen(path, &udf->lib); int err = uv_dlopen(path, &udf->lib);
if (err != 0) { if (err != 0) {
debugPrint("can not load library %s. error: %s", path, uv_strerror(err)); debugPrint("can not load library %s. error: %s", path, uv_strerror(err));
//TODO set error //TODO set error
} }
char normalFuncName[32] = {0}; char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(normalFuncName, setup->udfName); strcpy(normalFuncName, setup->udfName);
//TODO error, //TODO error, multi-thread, same udf, lock it
//TODO find all functions normal, init, destroy, normal, merge, finalize //TODO find all functions normal, init, destroy, normal, merge, finalize
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc)); uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc));
char freeFuncName[TSDB_FUNC_NAME_LEN + 5];
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
strcat(freeFuncName, "_free");
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle)); SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle));
handle->udf = udf; handle->udf = udf;
@ -96,10 +100,11 @@ void udfdProcessRequest(uv_work_t *req) {
rsp.setupRsp.udfHandle = (int64_t) (handle); rsp.setupRsp.udfHandle = (int64_t) (handle);
int32_t len = encodeUdfResponse(NULL, &rsp); int32_t len = encodeUdfResponse(NULL, &rsp);
rsp.msgLen = len; rsp.msgLen = len;
void *buf = taosMemoryMalloc(len); void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, &rsp); encodeUdfResponse(&buf, &rsp);
uvUdf->output = uv_buf_init(buf, len); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
break; break;
@ -131,12 +136,13 @@ void udfdProcessRequest(uv_work_t *req) {
int32_t len = encodeUdfResponse(NULL, rsp); int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len; rsp->msgLen = len;
void *buf = taosMemoryMalloc(len); void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp); encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(buf, len); uvUdf->output = uv_buf_init(bufBegin, len);
//TODO: free //TODO: free
udf->freeUdfColumnData(&output); udf->freeUdfColumn(&output);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
break; break;
@ -160,12 +166,12 @@ void udfdProcessRequest(uv_work_t *req) {
rsp->seqNum = request.seqNum; rsp->seqNum = request.seqNum;
rsp->type = request.type; rsp->type = request.type;
rsp->code = 0; rsp->code = 0;
SUdfTeardownResponse *subRsp = &response.teardownRsp;
int32_t len = encodeUdfResponse(NULL, rsp); int32_t len = encodeUdfResponse(NULL, rsp);
void *buf = taosMemoryMalloc(len);
rsp->msgLen = len; rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp); encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(buf, len); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
break; break;

View File

@ -29,11 +29,15 @@ int main(int argc, char *argv[]) {
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pBlock->info.numOfCols = 1; pBlock->info.numOfCols = 1;
pBlock->info.rows = 4; pBlock->info.rows = 4;
char data[16] = {0};
char bitmap[1] = {0};
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT; colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t); colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1; colInfo.info.colId = 1;
colInfo.pData = data;
colInfo.nullbitmap = bitmap;
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j); colDataAppendInt32(&colInfo, j, &j);
} }