Merge pull request #12604 from taosdata/feature/udf
feat: remove invalid udf function handle cache
This commit is contained in:
commit
fff19722e4
|
@ -1314,6 +1314,90 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
|
||||||
|
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
|
||||||
|
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
|
||||||
|
return strcmp(stub1->udfName, stub2->udfName);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
|
||||||
|
int32_t code = 0;
|
||||||
|
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
SUdfcFuncStub key = {0};
|
||||||
|
strcpy(key.udfName, udfName);
|
||||||
|
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||||
|
if (stubIndex != -1) {
|
||||||
|
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
|
||||||
|
UdfcFuncHandle handle = foundStub->handle;
|
||||||
|
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
|
||||||
|
*pHandle = foundStub->handle;
|
||||||
|
++foundStub->refCount;
|
||||||
|
foundStub->lastRefTime = taosGetTimestampUs();
|
||||||
|
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
|
||||||
|
udfName, foundStub->refCount, foundStub->lastRefTime);
|
||||||
|
taosArrayRemove(gUdfdProxy.udfStubs, stubIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*pHandle = NULL;
|
||||||
|
code = doSetupUdf(udfName, pHandle);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
SUdfcFuncStub stub = {0};
|
||||||
|
strcpy(stub.udfName, udfName);
|
||||||
|
stub.handle = *pHandle;
|
||||||
|
++stub.refCount;
|
||||||
|
stub.lastRefTime = taosGetTimestampUs();
|
||||||
|
taosArrayPush(gUdfdProxy.udfStubs, &stub);
|
||||||
|
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
|
||||||
|
} else {
|
||||||
|
*pHandle = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void releaseUdfFuncHandle(char* udfName) {
|
||||||
|
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
SUdfcFuncStub key = {0};
|
||||||
|
strcpy(key.udfName, udfName);
|
||||||
|
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||||
|
ASSERT(foundStub);
|
||||||
|
--foundStub->refCount;
|
||||||
|
ASSERT(foundStub->refCount>=0);
|
||||||
|
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t cleanUpUdfs() {
|
||||||
|
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
int32_t i = 0;
|
||||||
|
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
||||||
|
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
|
||||||
|
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
|
||||||
|
if (stub->refCount == 0) {
|
||||||
|
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
|
||||||
|
doTeardownUdf(stub->handle);
|
||||||
|
} else {
|
||||||
|
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
|
||||||
|
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
|
||||||
|
UdfcFuncHandle handle = stub->handle;
|
||||||
|
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
|
||||||
|
taosArrayPush(udfStubs, stub);
|
||||||
|
} else {
|
||||||
|
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
|
||||||
|
stub->udfName, stub->refCount, stub->lastRefTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
taosArrayDestroy(gUdfdProxy.udfStubs);
|
||||||
|
gUdfdProxy.udfStubs = udfStubs;
|
||||||
|
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
|
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
|
||||||
SSDataBlock* output, SUdfInterBuf *newState) {
|
SSDataBlock* output, SUdfInterBuf *newState) {
|
||||||
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
|
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
|
||||||
|
@ -1437,57 +1521,10 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
|
|
||||||
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
|
|
||||||
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
|
|
||||||
return strcmp(stub1->udfName, stub2->udfName);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
SUdfcFuncStub key = {0};
|
|
||||||
strcpy(key.udfName, udfName);
|
|
||||||
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
|
||||||
if (foundStub != NULL) {
|
|
||||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
*pHandle = foundStub->handle;
|
|
||||||
++foundStub->refCount;
|
|
||||||
foundStub->lastRefTime = taosGetTimestampUs();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
*pHandle = NULL;
|
|
||||||
code = doSetupUdf(udfName, pHandle);
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
SUdfcFuncStub stub = {0};
|
|
||||||
strcpy(stub.udfName, udfName);
|
|
||||||
stub.handle = *pHandle;
|
|
||||||
++stub.refCount;
|
|
||||||
stub.lastRefTime = taosGetTimestampUs();
|
|
||||||
taosArrayPush(gUdfdProxy.udfStubs, &stub);
|
|
||||||
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
|
|
||||||
} else {
|
|
||||||
*pHandle = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
void releaseUdfFuncHandle(char* udfName) {
|
|
||||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
SUdfcFuncStub key = {0};
|
|
||||||
strcpy(key.udfName, udfName);
|
|
||||||
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
|
||||||
ASSERT(foundStub);
|
|
||||||
--foundStub->refCount;
|
|
||||||
ASSERT(foundStub->refCount>=0);
|
|
||||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
|
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
|
||||||
UdfcFuncHandle handle = NULL;
|
UdfcFuncHandle handle = NULL;
|
||||||
int32_t code = accquireUdfFuncHandle(udfName, &handle);
|
int32_t code = acquireUdfFuncHandle(udfName, &handle);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1549,7 +1586,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
||||||
}
|
}
|
||||||
UdfcFuncHandle handle;
|
UdfcFuncHandle handle;
|
||||||
int32_t udfCode = 0;
|
int32_t udfCode = 0;
|
||||||
if ((udfCode = accquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
|
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
|
||||||
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
|
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1662,25 +1699,3 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
releaseUdfFuncHandle(pCtx->udfName);
|
releaseUdfFuncHandle(pCtx->udfName);
|
||||||
return udfCallCode == 0 ? numOfResults : udfCallCode;
|
return udfCallCode == 0 ? numOfResults : udfCallCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cleanUpUdfs() {
|
|
||||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
int32_t i = 0;
|
|
||||||
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
|
||||||
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
|
|
||||||
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
|
|
||||||
if (stub->refCount == 0) {
|
|
||||||
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
|
|
||||||
doTeardownUdf(stub->handle);
|
|
||||||
} else {
|
|
||||||
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
|
|
||||||
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
|
|
||||||
taosArrayPush(udfStubs, stub);
|
|
||||||
}
|
|
||||||
++i;
|
|
||||||
}
|
|
||||||
taosArrayDestroy(gUdfdProxy.udfStubs);
|
|
||||||
gUdfdProxy.udfStubs = udfStubs;
|
|
||||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
|
@ -86,17 +86,148 @@ typedef struct SUdf {
|
||||||
TUdfDestroyFunc destroyFunc;
|
TUdfDestroyFunc destroyFunc;
|
||||||
} SUdf;
|
} SUdf;
|
||||||
|
|
||||||
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
|
||||||
// TODO: add private udf structure.
|
// TODO: add private udf structure.
|
||||||
typedef struct SUdfcFuncHandle {
|
typedef struct SUdfcFuncHandle {
|
||||||
SUdf *udf;
|
SUdf *udf;
|
||||||
} SUdfcFuncHandle;
|
} SUdfcFuncHandle;
|
||||||
|
|
||||||
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
|
typedef enum EUdfdRpcReqRspType {
|
||||||
|
UDFD_RPC_MNODE_CONNECT = 0,
|
||||||
|
UDFD_RPC_RETRIVE_FUNC,
|
||||||
|
} EUdfdRpcReqRspType;
|
||||||
|
|
||||||
|
typedef struct SUdfdRpcSendRecvInfo {
|
||||||
|
EUdfdRpcReqRspType rpcType;
|
||||||
|
int32_t code;
|
||||||
|
void* param;
|
||||||
|
uv_sem_t resultSem;
|
||||||
|
} SUdfdRpcSendRecvInfo;
|
||||||
|
|
||||||
|
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
|
||||||
|
ASSERT(pMsg->ahandle != NULL);
|
||||||
|
|
||||||
|
if (pEpSet) {
|
||||||
|
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
|
||||||
|
updateEpSet_s(&global.mgmtEp, pEpSet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->code != TSDB_CODE_SUCCESS) {
|
||||||
|
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
|
||||||
|
msgInfo->code = pMsg->code;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
|
||||||
|
SConnectRsp connectRsp = {0};
|
||||||
|
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
|
||||||
|
if (connectRsp.epSet.numOfEps == 0) {
|
||||||
|
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
|
||||||
|
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
|
||||||
|
}
|
||||||
|
msgInfo->code = 0;
|
||||||
|
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
|
||||||
|
SRetrieveFuncRsp retrieveRsp = {0};
|
||||||
|
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
|
||||||
|
|
||||||
|
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
|
||||||
|
SUdf* udf = msgInfo->param;
|
||||||
|
udf->funcType = pFuncInfo->funcType;
|
||||||
|
udf->scriptType = pFuncInfo->scriptType;
|
||||||
|
udf->outputType = pFuncInfo->funcType;
|
||||||
|
udf->outputLen = pFuncInfo->outputLen;
|
||||||
|
udf->bufSize = pFuncInfo->bufSize;
|
||||||
|
|
||||||
|
char path[PATH_MAX] = {0};
|
||||||
|
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
|
||||||
|
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
||||||
|
// TODO check for failure of flush to disk
|
||||||
|
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
|
||||||
|
taosCloseFile(&file);
|
||||||
|
strncpy(udf->path, path, strlen(path));
|
||||||
|
tFreeSFuncInfo(pFuncInfo);
|
||||||
|
taosArrayDestroy(retrieveRsp.pFuncInfos);
|
||||||
|
msgInfo->code = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
uv_sem_post(&msgInfo->resultSem);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
||||||
|
SRetrieveFuncReq retrieveReq = {0};
|
||||||
|
retrieveReq.numOfFuncs = 1;
|
||||||
|
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
|
||||||
|
taosArrayPush(retrieveReq.pFuncNames, udfName);
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
|
||||||
|
void *pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
|
||||||
|
taosArrayDestroy(retrieveReq.pFuncNames);
|
||||||
|
|
||||||
|
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
||||||
|
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
|
||||||
|
msgInfo->param = udf;
|
||||||
|
uv_sem_init(&msgInfo->resultSem, 0);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = pReq;
|
||||||
|
rpcMsg.contLen = contLen;
|
||||||
|
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
|
||||||
|
rpcMsg.ahandle = msgInfo;
|
||||||
|
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
||||||
|
|
||||||
|
uv_sem_wait(&msgInfo->resultSem);
|
||||||
|
uv_sem_destroy(&msgInfo->resultSem);
|
||||||
|
int32_t code = msgInfo->code;
|
||||||
|
taosMemoryFree(msgInfo);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfdConnectToMnode() {
|
||||||
|
SConnectReq connReq = {0};
|
||||||
|
connReq.connType = CONN_TYPE__UDFD;
|
||||||
|
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
|
||||||
|
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
|
||||||
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
|
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
|
||||||
|
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
|
||||||
|
connReq.pid = htonl(taosGetPId());
|
||||||
|
connReq.startTime = htobe64(taosGetTimestampMs());
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSConnectReq(pReq, contLen, &connReq);
|
||||||
|
|
||||||
|
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
||||||
|
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
|
||||||
|
uv_sem_init(&msgInfo->resultSem, 0);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.msgType = TDMT_MND_CONNECT;
|
||||||
|
rpcMsg.pCont = pReq;
|
||||||
|
rpcMsg.contLen = contLen;
|
||||||
|
rpcMsg.ahandle = msgInfo;
|
||||||
|
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
||||||
|
|
||||||
|
uv_sem_wait(&msgInfo->resultSem);
|
||||||
|
int32_t code = msgInfo->code;
|
||||||
|
uv_sem_destroy(&msgInfo->resultSem);
|
||||||
|
taosMemoryFree(msgInfo);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
strcpy(udf->name, udfName);
|
strcpy(udf->name, udfName);
|
||||||
int32_t err = 0;
|
int32_t err = 0;
|
||||||
|
|
||||||
err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
|
err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
fnError("can not retrieve udf from mnode. udf name %s", udfName);
|
fnError("can not retrieve udf from mnode. udf name %s", udfName);
|
||||||
|
@ -515,140 +646,6 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
|
||||||
uv_stop(global.loop);
|
uv_stop(global.loop);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef enum EUdfdRpcReqRspType {
|
|
||||||
UDFD_RPC_MNODE_CONNECT = 0,
|
|
||||||
UDFD_RPC_RETRIVE_FUNC,
|
|
||||||
} EUdfdRpcReqRspType;
|
|
||||||
|
|
||||||
typedef struct SUdfdRpcSendRecvInfo {
|
|
||||||
EUdfdRpcReqRspType rpcType;
|
|
||||||
int32_t code;
|
|
||||||
void* param;
|
|
||||||
uv_sem_t resultSem;
|
|
||||||
} SUdfdRpcSendRecvInfo;
|
|
||||||
|
|
||||||
|
|
||||||
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|
||||||
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
|
|
||||||
ASSERT(pMsg->ahandle != NULL);
|
|
||||||
|
|
||||||
if (pEpSet) {
|
|
||||||
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
|
|
||||||
updateEpSet_s(&global.mgmtEp, pEpSet);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMsg->code != TSDB_CODE_SUCCESS) {
|
|
||||||
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
|
|
||||||
msgInfo->code = pMsg->code;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
|
|
||||||
SConnectRsp connectRsp = {0};
|
|
||||||
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
|
|
||||||
if (connectRsp.epSet.numOfEps == 0) {
|
|
||||||
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
|
|
||||||
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
|
|
||||||
}
|
|
||||||
msgInfo->code = 0;
|
|
||||||
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
|
|
||||||
SRetrieveFuncRsp retrieveRsp = {0};
|
|
||||||
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
|
|
||||||
|
|
||||||
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
|
|
||||||
SUdf* udf = msgInfo->param;
|
|
||||||
udf->funcType = pFuncInfo->funcType;
|
|
||||||
udf->scriptType = pFuncInfo->scriptType;
|
|
||||||
udf->outputType = pFuncInfo->funcType;
|
|
||||||
udf->outputLen = pFuncInfo->outputLen;
|
|
||||||
udf->bufSize = pFuncInfo->bufSize;
|
|
||||||
|
|
||||||
char path[PATH_MAX] = {0};
|
|
||||||
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
|
|
||||||
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
|
||||||
// TODO check for failure of flush to disk
|
|
||||||
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
|
|
||||||
taosCloseFile(&file);
|
|
||||||
strncpy(udf->path, path, strlen(path));
|
|
||||||
tFreeSFuncInfo(pFuncInfo);
|
|
||||||
taosArrayDestroy(retrieveRsp.pFuncInfos);
|
|
||||||
msgInfo->code = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
_return:
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
uv_sem_post(&msgInfo->resultSem);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t udfdConnectToMNode() {
|
|
||||||
SConnectReq connReq = {0};
|
|
||||||
connReq.connType = CONN_TYPE__UDFD;
|
|
||||||
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
|
|
||||||
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
|
|
||||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
|
||||||
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
|
|
||||||
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
|
|
||||||
connReq.pid = htonl(taosGetPId());
|
|
||||||
connReq.startTime = htobe64(taosGetTimestampMs());
|
|
||||||
|
|
||||||
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
|
|
||||||
void* pReq = rpcMallocCont(contLen);
|
|
||||||
tSerializeSConnectReq(pReq, contLen, &connReq);
|
|
||||||
|
|
||||||
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
|
||||||
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
|
|
||||||
uv_sem_init(&msgInfo->resultSem, 0);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
|
||||||
rpcMsg.msgType = TDMT_MND_CONNECT;
|
|
||||||
rpcMsg.pCont = pReq;
|
|
||||||
rpcMsg.contLen = contLen;
|
|
||||||
rpcMsg.ahandle = msgInfo;
|
|
||||||
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
|
||||||
|
|
||||||
uv_sem_wait(&msgInfo->resultSem);
|
|
||||||
int32_t code = msgInfo->code;
|
|
||||||
uv_sem_destroy(&msgInfo->resultSem);
|
|
||||||
taosMemoryFree(msgInfo);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
|
||||||
SRetrieveFuncReq retrieveReq = {0};
|
|
||||||
retrieveReq.numOfFuncs = 1;
|
|
||||||
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
|
|
||||||
taosArrayPush(retrieveReq.pFuncNames, udfName);
|
|
||||||
|
|
||||||
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
|
|
||||||
void *pReq = rpcMallocCont(contLen);
|
|
||||||
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
|
|
||||||
taosArrayDestroy(retrieveReq.pFuncNames);
|
|
||||||
|
|
||||||
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
|
||||||
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
|
|
||||||
msgInfo->param = udf;
|
|
||||||
uv_sem_init(&msgInfo->resultSem, 0);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
|
||||||
rpcMsg.pCont = pReq;
|
|
||||||
rpcMsg.contLen = contLen;
|
|
||||||
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
|
|
||||||
rpcMsg.ahandle = msgInfo;
|
|
||||||
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
|
||||||
|
|
||||||
uv_sem_wait(&msgInfo->resultSem);
|
|
||||||
uv_sem_destroy(&msgInfo->resultSem);
|
|
||||||
int32_t code = msgInfo->code;
|
|
||||||
taosMemoryFree(msgInfo);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool udfdRpcRfp(int32_t code) {
|
static bool udfdRpcRfp(int32_t code) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT) {
|
if (code == TSDB_CODE_RPC_REDIRECT) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -884,7 +881,18 @@ int main(int argc, char *argv[]) {
|
||||||
return -3;
|
return -3;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (udfdConnectToMNode() != 0) {
|
int32_t retryMnodeTimes = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
while (retryMnodeTimes++ < TSDB_MAX_REPLICA) {
|
||||||
|
uv_sleep(500 * ( 1 << retryMnodeTimes));
|
||||||
|
code = udfdConnectToMnode();
|
||||||
|
if (code == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
fnError("can not connect to mnode, code: %s. retry", tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
fnError("failed to start since can not connect to mnode");
|
fnError("failed to start since can not connect to mnode");
|
||||||
return -4;
|
return -4;
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,21 +41,19 @@ int scalarFuncTest() {
|
||||||
fnError("setup udf failure");
|
fnError("setup udf failure");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
int64_t beg = taosGetTimestampUs();
|
||||||
|
for (int k = 0; k < 1; ++k) {
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
SSDataBlock *pBlock = █
|
SSDataBlock *pBlock = █
|
||||||
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
||||||
pBlock->info.numOfCols = 1;
|
pBlock->info.numOfCols = 1;
|
||||||
pBlock->info.rows = 4;
|
pBlock->info.rows = 1024;
|
||||||
char data[16] = {0};
|
|
||||||
char bitmap[4] = {0};
|
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
colInfo.info.type = TSDB_DATA_TYPE_INT;
|
colInfo.info.type = TSDB_DATA_TYPE_INT;
|
||||||
colInfo.info.bytes = sizeof(int32_t);
|
colInfo.info.bytes = sizeof(int32_t);
|
||||||
colInfo.info.colId = 1;
|
colInfo.info.colId = 1;
|
||||||
colInfo.pData = data;
|
colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows);
|
||||||
colInfo.nullbitmap = bitmap;
|
|
||||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||||
colDataAppendInt32(&colInfo, j, &j);
|
colDataAppendInt32(&colInfo, j, &j);
|
||||||
}
|
}
|
||||||
|
@ -70,12 +68,14 @@ int scalarFuncTest() {
|
||||||
taosArrayDestroy(pBlock->pDataBlock);
|
taosArrayDestroy(pBlock->pDataBlock);
|
||||||
SColumnInfoData *col = output.columnData;
|
SColumnInfoData *col = output.columnData;
|
||||||
for (int32_t i = 0; i < output.numOfRows; ++i) {
|
for (int32_t i = 0; i < output.numOfRows; ++i) {
|
||||||
|
if (i % 100 == 0)
|
||||||
fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
|
fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
|
||||||
}
|
}
|
||||||
|
|
||||||
colDataDestroy(output.columnData);
|
colDataDestroy(output.columnData);
|
||||||
taosMemoryFree(output.columnData);
|
taosMemoryFree(output.columnData);
|
||||||
|
}
|
||||||
|
int64_t end = taosGetTimestampUs();
|
||||||
|
fprintf(stderr, "time: %f\n", (end-beg)/1000.0);
|
||||||
doTeardownUdf(handle);
|
doTeardownUdf(handle);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -93,16 +93,13 @@ int aggregateFuncTest() {
|
||||||
SSDataBlock *pBlock = █
|
SSDataBlock *pBlock = █
|
||||||
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
||||||
pBlock->info.numOfCols = 1;
|
pBlock->info.numOfCols = 1;
|
||||||
pBlock->info.rows = 4;
|
pBlock->info.rows = 1024;
|
||||||
char data[16] = {0};
|
|
||||||
char bitmap[4] = {0};
|
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
colInfo.info.type = TSDB_DATA_TYPE_INT;
|
colInfo.info.type = TSDB_DATA_TYPE_INT;
|
||||||
colInfo.info.bytes = sizeof(int32_t);
|
colInfo.info.bytes = sizeof(int32_t);
|
||||||
colInfo.info.colId = 1;
|
colInfo.info.colId = 1;
|
||||||
colInfo.pData = data;
|
colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows);
|
||||||
colInfo.nullbitmap = bitmap;
|
|
||||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||||
colDataAppendInt32(&colInfo, j, &j);
|
colDataAppendInt32(&colInfo, j, &j);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue