fix: clear invalid udf func handle
This commit is contained in:
parent
e73b1c4889
commit
ee2f3e7e05
|
@ -1314,6 +1314,90 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
|
||||||
|
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
|
||||||
|
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
|
||||||
|
return strcmp(stub1->udfName, stub2->udfName);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
|
||||||
|
int32_t code = 0;
|
||||||
|
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
SUdfcFuncStub key = {0};
|
||||||
|
strcpy(key.udfName, udfName);
|
||||||
|
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||||
|
if (stubIndex != -1) {
|
||||||
|
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
|
||||||
|
UdfcFuncHandle handle = foundStub->handle;
|
||||||
|
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
|
||||||
|
*pHandle = foundStub->handle;
|
||||||
|
++foundStub->refCount;
|
||||||
|
foundStub->lastRefTime = taosGetTimestampUs();
|
||||||
|
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
|
||||||
|
udfName, foundStub->refCount, foundStub->lastRefTime);
|
||||||
|
taosArrayRemove(gUdfdProxy.udfStubs, stubIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*pHandle = NULL;
|
||||||
|
code = doSetupUdf(udfName, pHandle);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
SUdfcFuncStub stub = {0};
|
||||||
|
strcpy(stub.udfName, udfName);
|
||||||
|
stub.handle = *pHandle;
|
||||||
|
++stub.refCount;
|
||||||
|
stub.lastRefTime = taosGetTimestampUs();
|
||||||
|
taosArrayPush(gUdfdProxy.udfStubs, &stub);
|
||||||
|
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
|
||||||
|
} else {
|
||||||
|
*pHandle = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void releaseUdfFuncHandle(char* udfName) {
|
||||||
|
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
SUdfcFuncStub key = {0};
|
||||||
|
strcpy(key.udfName, udfName);
|
||||||
|
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||||
|
ASSERT(foundStub);
|
||||||
|
--foundStub->refCount;
|
||||||
|
ASSERT(foundStub->refCount>=0);
|
||||||
|
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t cleanUpUdfs() {
|
||||||
|
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
int32_t i = 0;
|
||||||
|
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
||||||
|
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
|
||||||
|
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
|
||||||
|
if (stub->refCount == 0) {
|
||||||
|
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
|
||||||
|
doTeardownUdf(stub->handle);
|
||||||
|
} else {
|
||||||
|
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
|
||||||
|
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
|
||||||
|
UdfcFuncHandle handle = stub->handle;
|
||||||
|
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
|
||||||
|
taosArrayPush(udfStubs, stub);
|
||||||
|
} else {
|
||||||
|
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
|
||||||
|
stub->udfName, stub->refCount, stub->lastRefTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
taosArrayDestroy(gUdfdProxy.udfStubs);
|
||||||
|
gUdfdProxy.udfStubs = udfStubs;
|
||||||
|
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
|
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
|
||||||
SSDataBlock* output, SUdfInterBuf *newState) {
|
SSDataBlock* output, SUdfInterBuf *newState) {
|
||||||
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
|
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
|
||||||
|
@ -1437,57 +1521,10 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
|
|
||||||
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
|
|
||||||
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
|
|
||||||
return strcmp(stub1->udfName, stub2->udfName);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
SUdfcFuncStub key = {0};
|
|
||||||
strcpy(key.udfName, udfName);
|
|
||||||
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
|
||||||
if (foundStub != NULL) {
|
|
||||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
*pHandle = foundStub->handle;
|
|
||||||
++foundStub->refCount;
|
|
||||||
foundStub->lastRefTime = taosGetTimestampUs();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
*pHandle = NULL;
|
|
||||||
code = doSetupUdf(udfName, pHandle);
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
SUdfcFuncStub stub = {0};
|
|
||||||
strcpy(stub.udfName, udfName);
|
|
||||||
stub.handle = *pHandle;
|
|
||||||
++stub.refCount;
|
|
||||||
stub.lastRefTime = taosGetTimestampUs();
|
|
||||||
taosArrayPush(gUdfdProxy.udfStubs, &stub);
|
|
||||||
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
|
|
||||||
} else {
|
|
||||||
*pHandle = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
void releaseUdfFuncHandle(char* udfName) {
|
|
||||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
SUdfcFuncStub key = {0};
|
|
||||||
strcpy(key.udfName, udfName);
|
|
||||||
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
|
||||||
ASSERT(foundStub);
|
|
||||||
--foundStub->refCount;
|
|
||||||
ASSERT(foundStub->refCount>=0);
|
|
||||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
|
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
|
||||||
UdfcFuncHandle handle = NULL;
|
UdfcFuncHandle handle = NULL;
|
||||||
int32_t code = accquireUdfFuncHandle(udfName, &handle);
|
int32_t code = acquireUdfFuncHandle(udfName, &handle);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1549,7 +1586,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
||||||
}
|
}
|
||||||
UdfcFuncHandle handle;
|
UdfcFuncHandle handle;
|
||||||
int32_t udfCode = 0;
|
int32_t udfCode = 0;
|
||||||
if ((udfCode = accquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
|
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
|
||||||
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
|
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1662,25 +1699,3 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
releaseUdfFuncHandle(pCtx->udfName);
|
releaseUdfFuncHandle(pCtx->udfName);
|
||||||
return udfCallCode == 0 ? numOfResults : udfCallCode;
|
return udfCallCode == 0 ? numOfResults : udfCallCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cleanUpUdfs() {
|
|
||||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
int32_t i = 0;
|
|
||||||
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
|
||||||
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
|
|
||||||
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
|
|
||||||
if (stub->refCount == 0) {
|
|
||||||
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
|
|
||||||
doTeardownUdf(stub->handle);
|
|
||||||
} else {
|
|
||||||
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
|
|
||||||
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
|
|
||||||
taosArrayPush(udfStubs, stub);
|
|
||||||
}
|
|
||||||
++i;
|
|
||||||
}
|
|
||||||
taosArrayDestroy(gUdfdProxy.udfStubs);
|
|
||||||
gUdfdProxy.udfStubs = udfStubs;
|
|
||||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
Loading…
Reference in New Issue