diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 192e04c9a3..e0ede63352 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1314,6 +1314,90 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { return err; } +int compareUdfcFuncSub(const void* elem1, const void* elem2) { + SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; + SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; + return strcmp(stub1->udfName, stub2->udfName); +} + +int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { + int32_t code = 0; + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + SUdfcFuncStub key = {0}; + strcpy(key.udfName, udfName); + int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); + if (stubIndex != -1) { + SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex); + UdfcFuncHandle handle = foundStub->handle; + if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { + *pHandle = foundStub->handle; + ++foundStub->refCount; + foundStub->lastRefTime = taosGetTimestampUs(); + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return 0; + } else { + fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", + udfName, foundStub->refCount, foundStub->lastRefTime); + taosArrayRemove(gUdfdProxy.udfStubs, stubIndex); + } + } + *pHandle = NULL; + code = doSetupUdf(udfName, pHandle); + if (code == TSDB_CODE_SUCCESS) { + SUdfcFuncStub stub = {0}; + strcpy(stub.udfName, udfName); + stub.handle = *pHandle; + ++stub.refCount; + stub.lastRefTime = taosGetTimestampUs(); + taosArrayPush(gUdfdProxy.udfStubs, &stub); + taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); + } else { + *pHandle = NULL; + } + + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return code; +} + +void releaseUdfFuncHandle(char* udfName) { + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + SUdfcFuncStub key = {0}; + strcpy(key.udfName, udfName); + SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); + ASSERT(foundStub); + --foundStub->refCount; + ASSERT(foundStub->refCount>=0); + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); +} + +int32_t cleanUpUdfs() { + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + int32_t i = 0; + SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); + if (stub->refCount == 0) { + fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); + doTeardownUdf(stub->handle); + } else { + fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", + stub->udfName, stub->refCount, stub->lastRefTime, stub->handle); + UdfcFuncHandle handle = stub->handle; + if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { + taosArrayPush(udfStubs, stub); + } else { + fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", + stub->udfName, stub->refCount, stub->lastRefTime); + } + } + ++i; + } + taosArrayDestroy(gUdfdProxy.udfStubs); + gUdfdProxy.udfStubs = udfStubs; + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return 0; +} + int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, SSDataBlock* output, SUdfInterBuf *newState) { fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle); @@ -1437,57 +1521,10 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t return err; } -int compareUdfcFuncSub(const void* elem1, const void* elem2) { - SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; - SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; - return strcmp(stub1->udfName, stub2->udfName); -} - -int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { - int32_t code = 0; - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - SUdfcFuncStub key = {0}; - strcpy(key.udfName, udfName); - SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - if (foundStub != NULL) { - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); - *pHandle = foundStub->handle; - ++foundStub->refCount; - foundStub->lastRefTime = taosGetTimestampUs(); - return 0; - } - *pHandle = NULL; - code = doSetupUdf(udfName, pHandle); - if (code == TSDB_CODE_SUCCESS) { - SUdfcFuncStub stub = {0}; - strcpy(stub.udfName, udfName); - stub.handle = *pHandle; - ++stub.refCount; - stub.lastRefTime = taosGetTimestampUs(); - taosArrayPush(gUdfdProxy.udfStubs, &stub); - taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); - } else { - *pHandle = NULL; - } - - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); - return code; -} - -void releaseUdfFuncHandle(char* udfName) { - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - SUdfcFuncStub key = {0}; - strcpy(key.udfName, udfName); - SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - ASSERT(foundStub); - --foundStub->refCount; - ASSERT(foundStub->refCount>=0); - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); -} int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { UdfcFuncHandle handle = NULL; - int32_t code = accquireUdfFuncHandle(udfName, &handle); + int32_t code = acquireUdfFuncHandle(udfName, &handle); if (code != 0) { return code; } @@ -1549,7 +1586,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult } UdfcFuncHandle handle; int32_t udfCode = 0; - if ((udfCode = accquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { + if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); return false; } @@ -1662,25 +1699,3 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { releaseUdfFuncHandle(pCtx->udfName); return udfCallCode == 0 ? numOfResults : udfCallCode; } - -int32_t cleanUpUdfs() { - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - int32_t i = 0; - SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); - while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { - SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); - if (stub->refCount == 0) { - fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); - doTeardownUdf(stub->handle); - } else { - fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", - stub->udfName, stub->refCount, stub->lastRefTime, stub->handle); - taosArrayPush(udfStubs, stub); - } - ++i; - } - taosArrayDestroy(gUdfdProxy.udfStubs); - gUdfdProxy.udfStubs = udfStubs; - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); - return 0; -} \ No newline at end of file diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 1bb775de5b..9b6adb5a64 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -86,17 +86,148 @@ typedef struct SUdf { TUdfDestroyFunc destroyFunc; } SUdf; -// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix // TODO: add private udf structure. typedef struct SUdfcFuncHandle { SUdf *udf; } SUdfcFuncHandle; -int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); +typedef enum EUdfdRpcReqRspType { + UDFD_RPC_MNODE_CONNECT = 0, + UDFD_RPC_RETRIVE_FUNC, +} EUdfdRpcReqRspType; + +typedef struct SUdfdRpcSendRecvInfo { + EUdfdRpcReqRspType rpcType; + int32_t code; + void* param; + uv_sem_t resultSem; +} SUdfdRpcSendRecvInfo; + +void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; + ASSERT(pMsg->info.ahandle != NULL); + + if (pEpSet) { + if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { + updateEpSet_s(&global.mgmtEp, pEpSet); + } + } + + if (pMsg->code != TSDB_CODE_SUCCESS) { + fnError("udfd rpc error. code: %s", tstrerror(pMsg->code)); + msgInfo->code = pMsg->code; + goto _return; + } + + if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); + if (connectRsp.epSet.numOfEps == 0) { + msgInfo->code = TSDB_CODE_MND_APP_ERROR; + goto _return; + } + + if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) { + updateEpSet_s(&global.mgmtEp, &connectRsp.epSet); + } + msgInfo->code = 0; + } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { + SRetrieveFuncRsp retrieveRsp = {0}; + tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); + + SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); + SUdf* udf = msgInfo->param; + udf->funcType = pFuncInfo->funcType; + udf->scriptType = pFuncInfo->scriptType; + udf->outputType = pFuncInfo->funcType; + udf->outputLen = pFuncInfo->outputLen; + udf->bufSize = pFuncInfo->bufSize; + + char path[PATH_MAX] = {0}; + snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name); + TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + // TODO check for failure of flush to disk + taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); + taosCloseFile(&file); + strncpy(udf->path, path, strlen(path)); + tFreeSFuncInfo(pFuncInfo); + taosArrayDestroy(retrieveRsp.pFuncInfos); + msgInfo->code = 0; + } + +_return: + rpcFreeCont(pMsg->pCont); + uv_sem_post(&msgInfo->resultSem); + return; +} + +int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { + SRetrieveFuncReq retrieveReq = {0}; + retrieveReq.numOfFuncs = 1; + retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); + taosArrayPush(retrieveReq.pFuncNames, udfName); + + int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); + void *pReq = rpcMallocCont(contLen); + tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); + taosArrayDestroy(retrieveReq.pFuncNames); + + SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; + msgInfo->param = udf; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; + rpcMsg.info.ahandle = msgInfo; + rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + uv_sem_destroy(&msgInfo->resultSem); + int32_t code = msgInfo->code; + taosMemoryFree(msgInfo); + return code; +} + +int32_t udfdConnectToMnode() { + SConnectReq connReq = {0}; + connReq.connType = CONN_TYPE__UDFD; + tstrncpy(connReq.app, "udfd",sizeof(connReq.app)); + tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user)); + char pass[TSDB_PASSWORD_LEN + 1] = {0}; + taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); + tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); + connReq.pid = htonl(taosGetPId()); + connReq.startTime = htobe64(taosGetTimestampMs()); + + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connReq); + + SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TDMT_MND_CONNECT; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.info.ahandle = msgInfo; + rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + int32_t code = msgInfo->code; + uv_sem_destroy(&msgInfo->resultSem); + taosMemoryFree(msgInfo); + return code; +} int32_t udfdLoadUdf(char *udfName, SUdf *udf) { strcpy(udf->name, udfName); int32_t err = 0; + err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf); if (err != 0) { fnError("can not retrieve udf from mnode. udf name %s", udfName); @@ -515,140 +646,6 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { uv_stop(global.loop); } -typedef enum EUdfdRpcReqRspType { - UDFD_RPC_MNODE_CONNECT = 0, - UDFD_RPC_RETRIVE_FUNC, -} EUdfdRpcReqRspType; - -typedef struct SUdfdRpcSendRecvInfo { - EUdfdRpcReqRspType rpcType; - int32_t code; - void* param; - uv_sem_t resultSem; -} SUdfdRpcSendRecvInfo; - - -void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; - ASSERT(pMsg->info.ahandle != NULL); - - if (pEpSet) { - if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { - updateEpSet_s(&global.mgmtEp, pEpSet); - } - } - - if (pMsg->code != TSDB_CODE_SUCCESS) { - fnError("udfd rpc error. code: %s", tstrerror(pMsg->code)); - msgInfo->code = pMsg->code; - goto _return; - } - - if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { - SConnectRsp connectRsp = {0}; - tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); - if (connectRsp.epSet.numOfEps == 0) { - msgInfo->code = TSDB_CODE_MND_APP_ERROR; - goto _return; - } - - if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) { - updateEpSet_s(&global.mgmtEp, &connectRsp.epSet); - } - msgInfo->code = 0; - } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { - SRetrieveFuncRsp retrieveRsp = {0}; - tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); - - SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - SUdf* udf = msgInfo->param; - udf->funcType = pFuncInfo->funcType; - udf->scriptType = pFuncInfo->scriptType; - udf->outputType = pFuncInfo->funcType; - udf->outputLen = pFuncInfo->outputLen; - udf->bufSize = pFuncInfo->bufSize; - - char path[PATH_MAX] = {0}; - snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name); - TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); - // TODO check for failure of flush to disk - taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); - taosCloseFile(&file); - strncpy(udf->path, path, strlen(path)); - tFreeSFuncInfo(pFuncInfo); - taosArrayDestroy(retrieveRsp.pFuncInfos); - msgInfo->code = 0; - } - -_return: - rpcFreeCont(pMsg->pCont); - uv_sem_post(&msgInfo->resultSem); - return; -} - -int32_t udfdConnectToMNode() { - SConnectReq connReq = {0}; - connReq.connType = CONN_TYPE__UDFD; - tstrncpy(connReq.app, "udfd",sizeof(connReq.app)); - tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user)); - char pass[TSDB_PASSWORD_LEN + 1] = {0}; - taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); - tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); - connReq.pid = htonl(taosGetPId()); - connReq.startTime = htobe64(taosGetTimestampMs()); - - int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); - void* pReq = rpcMallocCont(contLen); - tSerializeSConnectReq(pReq, contLen, &connReq); - - SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); - msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT; - uv_sem_init(&msgInfo->resultSem, 0); - - SRpcMsg rpcMsg = {0}; - rpcMsg.msgType = TDMT_MND_CONNECT; - rpcMsg.pCont = pReq; - rpcMsg.contLen = contLen; - rpcMsg.info.ahandle = msgInfo; - rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); - - uv_sem_wait(&msgInfo->resultSem); - int32_t code = msgInfo->code; - uv_sem_destroy(&msgInfo->resultSem); - taosMemoryFree(msgInfo); - return code; -} - -int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { - SRetrieveFuncReq retrieveReq = {0}; - retrieveReq.numOfFuncs = 1; - retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, udfName); - - int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); - void *pReq = rpcMallocCont(contLen); - tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); - taosArrayDestroy(retrieveReq.pFuncNames); - - SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); - msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; - msgInfo->param = udf; - uv_sem_init(&msgInfo->resultSem, 0); - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = contLen; - rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; - rpcMsg.info.ahandle = msgInfo; - rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); - - uv_sem_wait(&msgInfo->resultSem); - uv_sem_destroy(&msgInfo->resultSem); - int32_t code = msgInfo->code; - taosMemoryFree(msgInfo); - return code; -} - static bool udfdRpcRfp(int32_t code) { if (code == TSDB_CODE_RPC_REDIRECT) { return true; @@ -884,7 +881,18 @@ int main(int argc, char *argv[]) { return -3; } - if (udfdConnectToMNode() != 0) { + int32_t retryMnodeTimes = 0; + int32_t code = 0; + while (retryMnodeTimes++ < TSDB_MAX_REPLICA) { + uv_sleep(500 * ( 1 << retryMnodeTimes)); + code = udfdConnectToMnode(); + if (code == 0) { + break; + } + fnError("can not connect to mnode, code: %s. retry", tstrerror(code)); + } + + if (code != 0) { fnError("failed to start since can not connect to mnode"); return -4; } diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 9fe9269a3f..15109db220 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -41,41 +41,41 @@ int scalarFuncTest() { fnError("setup udf failure"); return -1; } - - SSDataBlock block = {0}; - SSDataBlock *pBlock = █ - pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); - pBlock->info.numOfCols = 1; - pBlock->info.rows = 4; - char data[16] = {0}; - char bitmap[4] = {0}; - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData colInfo = {0}; - colInfo.info.type = TSDB_DATA_TYPE_INT; - colInfo.info.bytes = sizeof(int32_t); - colInfo.info.colId = 1; - colInfo.pData = data; - colInfo.nullbitmap = bitmap; - for (int32_t j = 0; j < pBlock->info.rows; ++j) { - colDataAppendInt32(&colInfo, j, &j); + int64_t beg = taosGetTimestampUs(); + for (int k = 0; k < 1; ++k) { + SSDataBlock block = {0}; + SSDataBlock *pBlock = █ + pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + pBlock->info.numOfCols = 1; + pBlock->info.rows = 1024; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData colInfo = {0}; + colInfo.info.type = TSDB_DATA_TYPE_INT; + colInfo.info.bytes = sizeof(int32_t); + colInfo.info.colId = 1; + colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows); + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + colDataAppendInt32(&colInfo, j, &j); + } + taosArrayPush(pBlock->pDataBlock, &colInfo); } - taosArrayPush(pBlock->pDataBlock, &colInfo); + + SScalarParam input = {0}; + input.numOfRows = pBlock->info.rows; + input.columnData = taosArrayGet(pBlock->pDataBlock, 0); + SScalarParam output = {0}; + doCallUdfScalarFunc(handle, &input, 1, &output); + taosArrayDestroy(pBlock->pDataBlock); + SColumnInfoData *col = output.columnData; + for (int32_t i = 0; i < output.numOfRows; ++i) { + if (i % 100 == 0) + fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); + } + colDataDestroy(output.columnData); + taosMemoryFree(output.columnData); } - - SScalarParam input = {0}; - input.numOfRows = pBlock->info.rows; - input.columnData = taosArrayGet(pBlock->pDataBlock, 0); - SScalarParam output = {0}; - doCallUdfScalarFunc(handle, &input, 1, &output); - taosArrayDestroy(pBlock->pDataBlock); - SColumnInfoData *col = output.columnData; - for (int32_t i = 0; i < output.numOfRows; ++i) { - fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); - } - - colDataDestroy(output.columnData); - taosMemoryFree(output.columnData); - + int64_t end = taosGetTimestampUs(); + fprintf(stderr, "time: %f\n", (end-beg)/1000.0); doTeardownUdf(handle); return 0; @@ -93,16 +93,13 @@ int aggregateFuncTest() { SSDataBlock *pBlock = █ pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); pBlock->info.numOfCols = 1; - pBlock->info.rows = 4; - char data[16] = {0}; - char bitmap[4] = {0}; + pBlock->info.rows = 1024; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData colInfo = {0}; colInfo.info.type = TSDB_DATA_TYPE_INT; colInfo.info.bytes = sizeof(int32_t); colInfo.info.colId = 1; - colInfo.pData = data; - colInfo.nullbitmap = bitmap; + colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows); for (int32_t j = 0; j < pBlock->info.rows; ++j) { colDataAppendInt32(&colInfo, j, &j); }