Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
7e86ae9ffe
|
@ -70,7 +70,7 @@ typedef uint16_t tmsg_t;
|
||||||
#define TSDB_IE_TYPE_DNODE_EXT 6
|
#define TSDB_IE_TYPE_DNODE_EXT 6
|
||||||
#define TSDB_IE_TYPE_DNODE_STATE 7
|
#define TSDB_IE_TYPE_DNODE_STATE 7
|
||||||
|
|
||||||
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__MAX };
|
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__UDFD, CONN_TYPE__MAX };
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
HEARTBEAT_KEY_USER_AUTHINFO = 1,
|
HEARTBEAT_KEY_USER_AUTHINFO = 1,
|
||||||
|
|
|
@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx);
|
||||||
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t sampleFunction(SqlFunctionCtx* pCtx);
|
int32_t sampleFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
int32_t tailFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
|
||||||
|
|
|
@ -386,6 +386,35 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
|
if (2 != numOfParams && 3 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||||
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
|
"The input parameter of TAIL function can only be column");
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 1; i < numOfParams; ++i) {
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
|
||||||
|
if (!IS_INTEGER_TYPE(paraType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
uint8_t colType = pCol->resType.type;
|
||||||
|
if (IS_VAR_DATA_TYPE(colType)) {
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType};
|
||||||
|
} else {
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType};
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// todo
|
// todo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -850,6 +879,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = sampleFunction,
|
.processFunc = sampleFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "tail",
|
||||||
|
.type = FUNCTION_TYPE_TAIL,
|
||||||
|
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
|
.translateFunc = translateTail,
|
||||||
|
.getEnvFunc = getTailFuncEnv,
|
||||||
|
.initFunc = tailFunctionSetup,
|
||||||
|
.processFunc = tailFunction,
|
||||||
|
.finalizeFunc = tailFinalize
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "abs",
|
.name = "abs",
|
||||||
.type = FUNCTION_TYPE_ABS,
|
.type = FUNCTION_TYPE_ABS,
|
||||||
|
|
|
@ -18,12 +18,15 @@
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "taggfunction.h"
|
#include "taggfunction.h"
|
||||||
|
#include "tcompare.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tpercentile.h"
|
#include "tpercentile.h"
|
||||||
|
|
||||||
#define HISTOGRAM_MAX_BINS_NUM 1000
|
#define HISTOGRAM_MAX_BINS_NUM 1000
|
||||||
#define MAVG_MAX_POINTS_NUM 1000
|
#define MAVG_MAX_POINTS_NUM 1000
|
||||||
#define SAMPLE_MAX_POINTS_NUM 1000
|
#define SAMPLE_MAX_POINTS_NUM 1000
|
||||||
|
#define TAIL_MAX_POINTS_NUM 100
|
||||||
|
#define TAIL_MAX_OFFSET 100
|
||||||
|
|
||||||
typedef struct SSumRes {
|
typedef struct SSumRes {
|
||||||
union {
|
union {
|
||||||
|
@ -161,6 +164,21 @@ typedef struct SSampleInfo {
|
||||||
int64_t *timestamp;
|
int64_t *timestamp;
|
||||||
} SSampleInfo;
|
} SSampleInfo;
|
||||||
|
|
||||||
|
typedef struct STailItem {
|
||||||
|
int64_t timestamp;
|
||||||
|
bool isNull;
|
||||||
|
char data[];
|
||||||
|
} STailItem;
|
||||||
|
|
||||||
|
typedef struct STailInfo {
|
||||||
|
int32_t numOfPoints;
|
||||||
|
int32_t numAdded;
|
||||||
|
int32_t offset;
|
||||||
|
uint8_t colType;
|
||||||
|
int16_t colBytes;
|
||||||
|
STailItem **pItems;
|
||||||
|
} STailInfo;
|
||||||
|
|
||||||
#define SET_VAL(_info, numOfElem, res) \
|
#define SET_VAL(_info, numOfElem, res) \
|
||||||
do { \
|
do { \
|
||||||
if ((numOfElem) <= 0) { \
|
if ((numOfElem) <= 0) { \
|
||||||
|
@ -3107,7 +3125,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
int32_t startOffset = pCtx->offset;
|
int32_t startOffset = pCtx->offset;
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
if (colDataIsNull_s(pInputCol, i)) {
|
||||||
//colDataAppendNULL(pOutput, i);
|
//colDataAppendNULL(pOutput, i);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -3141,3 +3159,132 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||||
//
|
//
|
||||||
// return pResInfo->numOfRes;
|
// return pResInfo->numOfRes;
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
|
||||||
|
bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
|
int32_t numOfPoints = pVal->datum.i;
|
||||||
|
pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailItem) + pCol->node.resType.bytes);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
STailInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
|
pInfo->numAdded = 0;
|
||||||
|
pInfo->numOfPoints = pCtx->param[1].param.i;
|
||||||
|
if (pCtx->numOfParams == 4) {
|
||||||
|
pInfo->offset = pCtx->param[2].param.i;
|
||||||
|
} else {
|
||||||
|
pInfo->offset = 0;
|
||||||
|
}
|
||||||
|
pInfo->colType = pCtx->resDataInfo.type;
|
||||||
|
pInfo->colBytes = pCtx->resDataInfo.bytes;
|
||||||
|
if ((pInfo->numOfPoints < 1 || pInfo->numOfPoints > TAIL_MAX_POINTS_NUM) ||
|
||||||
|
(pInfo->numOfPoints < 0 || pInfo->numOfPoints > TAIL_MAX_OFFSET)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pItems = (STailItem **)((char *)pInfo + sizeof(STailInfo));
|
||||||
|
char *pItem = (char *)pInfo->pItems + pInfo->numOfPoints * POINTER_BYTES;
|
||||||
|
|
||||||
|
size_t unitSize = sizeof(STailItem) + pInfo->colBytes;
|
||||||
|
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||||
|
pInfo->pItems[i] = (STailItem *)(pItem + i * unitSize);
|
||||||
|
pInfo->pItems[i]->isNull = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tailAssignResult(STailItem* pItem, char *data, int32_t colBytes, TSKEY ts, bool isNull) {
|
||||||
|
pItem->timestamp = ts;
|
||||||
|
if (isNull) {
|
||||||
|
pItem->isNull = true;
|
||||||
|
} else {
|
||||||
|
memcpy(pItem->data, data, colBytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tailCompFn(const void *p1, const void *p2, const void *param) {
|
||||||
|
STailItem *d1 = *(STailItem **)p1;
|
||||||
|
STailItem *d2 = *(STailItem **)p2;
|
||||||
|
return compareInt64Val(&d1->timestamp, &d2->timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts, bool isNull) {
|
||||||
|
STailItem **pList = pInfo->pItems;
|
||||||
|
if (pInfo->numAdded < pInfo->numOfPoints) {
|
||||||
|
tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts, isNull);
|
||||||
|
taosheapsort((void *)pList, sizeof(STailItem **), pInfo->numAdded + 1, NULL, tailCompFn, 0);
|
||||||
|
pInfo->numAdded++;
|
||||||
|
} else if (pList[0]->timestamp < ts) {
|
||||||
|
tailAssignResult(pList[0], data, pInfo->colBytes, ts, isNull);
|
||||||
|
taosheapadjust((void *)pList, sizeof(STailItem **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tailFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||||
|
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||||
|
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||||
|
|
||||||
|
int32_t startOffset = pCtx->offset;
|
||||||
|
if (pInfo->offset >= pInput->numOfRows) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
pInfo->numOfPoints = MIN(pInfo->numOfPoints, pInput->numOfRows - pInfo->offset);
|
||||||
|
}
|
||||||
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex - pInfo->offset; i += 1) {
|
||||||
|
|
||||||
|
char* data = colDataGetData(pInputCol, i);
|
||||||
|
doTailAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i));
|
||||||
|
}
|
||||||
|
|
||||||
|
taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||||
|
int32_t pos = startOffset + i;
|
||||||
|
STailItem *pItem = pInfo->pItems[i];
|
||||||
|
if (pItem->isNull) {
|
||||||
|
colDataAppendNULL(pOutput, pos);
|
||||||
|
} else {
|
||||||
|
colDataAppend(pOutput, pos, pItem->data, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pInfo->numOfPoints;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
|
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
|
pEntryInfo->complete = true;
|
||||||
|
|
||||||
|
int32_t type = pCtx->input.pData[0]->info.type;
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
// todo assign the tag value and the corresponding row data
|
||||||
|
int32_t currentRow = pBlock->info.rows;
|
||||||
|
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
|
||||||
|
STailItem *pItem = pInfo->pItems[i];
|
||||||
|
colDataAppend(pCol, currentRow, pItem->data, false);
|
||||||
|
|
||||||
|
//setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow);
|
||||||
|
currentRow += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pEntryInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
|
@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub {
|
||||||
char udfName[TSDB_FUNC_NAME_LEN];
|
char udfName[TSDB_FUNC_NAME_LEN];
|
||||||
UdfcFuncHandle handle;
|
UdfcFuncHandle handle;
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
|
int64_t lastRefTime;
|
||||||
} SUdfcFuncStub;
|
} SUdfcFuncStub;
|
||||||
|
|
||||||
typedef struct SUdfcProxy {
|
typedef struct SUdfcProxy {
|
||||||
|
@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
|
||||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||||
*pHandle = foundStub->handle;
|
*pHandle = foundStub->handle;
|
||||||
++foundStub->refCount;
|
++foundStub->refCount;
|
||||||
|
foundStub->lastRefTime = taosGetTimestampUs();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
*pHandle = NULL;
|
*pHandle = NULL;
|
||||||
|
@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
|
||||||
strcpy(stub.udfName, udfName);
|
strcpy(stub.udfName, udfName);
|
||||||
stub.handle = *pHandle;
|
stub.handle = *pHandle;
|
||||||
++stub.refCount;
|
++stub.refCount;
|
||||||
|
stub.lastRefTime = taosGetTimestampUs();
|
||||||
taosArrayPush(gUdfdProxy.udfStubs, &stub);
|
taosArrayPush(gUdfdProxy.udfStubs, &stub);
|
||||||
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
|
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() {
|
||||||
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
|
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
|
||||||
doTeardownUdf(stub->handle);
|
doTeardownUdf(stub->handle);
|
||||||
} else {
|
} else {
|
||||||
fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle);
|
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
|
||||||
|
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
|
||||||
taosArrayPush(udfStubs, stub);
|
taosArrayPush(udfStubs, stub);
|
||||||
}
|
}
|
||||||
++i;
|
++i;
|
||||||
|
|
|
@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
|
||||||
uv_stop(global.loop);
|
uv_stop(global.loop);
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
|
typedef enum EUdfdRpcReqRspType {
|
||||||
|
UDFD_RPC_MNODE_CONNECT = 0,
|
||||||
|
UDFD_RPC_RETRIVE_FUNC,
|
||||||
|
} EUdfdRpcReqRspType;
|
||||||
|
|
||||||
|
typedef struct SUdfdRpcSendRecvInfo {
|
||||||
|
EUdfdRpcReqRspType rpcType;
|
||||||
|
int32_t code;
|
||||||
|
void* param;
|
||||||
|
uv_sem_t resultSem;
|
||||||
|
} SUdfdRpcSendRecvInfo;
|
||||||
|
|
||||||
|
|
||||||
|
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
|
||||||
|
ASSERT(pMsg->ahandle != NULL);
|
||||||
|
|
||||||
|
if (pEpSet) {
|
||||||
|
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
|
||||||
|
updateEpSet_s(&global.mgmtEp, pEpSet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->code != TSDB_CODE_SUCCESS) {
|
||||||
|
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
|
||||||
|
msgInfo->code = pMsg->code;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
|
||||||
|
SConnectRsp connectRsp = {0};
|
||||||
|
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
|
||||||
|
if (connectRsp.epSet.numOfEps == 0) {
|
||||||
|
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
|
||||||
|
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
|
||||||
|
}
|
||||||
|
msgInfo->code = 0;
|
||||||
|
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
|
||||||
|
SRetrieveFuncRsp retrieveRsp = {0};
|
||||||
|
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
|
||||||
|
|
||||||
|
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
|
||||||
|
SUdf* udf = msgInfo->param;
|
||||||
|
udf->funcType = pFuncInfo->funcType;
|
||||||
|
udf->scriptType = pFuncInfo->scriptType;
|
||||||
|
udf->outputType = pFuncInfo->funcType;
|
||||||
|
udf->outputLen = pFuncInfo->outputLen;
|
||||||
|
udf->bufSize = pFuncInfo->bufSize;
|
||||||
|
|
||||||
|
char path[PATH_MAX] = {0};
|
||||||
|
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
|
||||||
|
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
||||||
|
// TODO check for failure of flush to disk
|
||||||
|
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
|
||||||
|
taosCloseFile(&file);
|
||||||
|
strncpy(udf->path, path, strlen(path));
|
||||||
|
taosArrayDestroy(retrieveRsp.pFuncInfos);
|
||||||
|
msgInfo->code = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
uv_sem_post(&msgInfo->resultSem);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfdConnectToMNode() {
|
||||||
|
SConnectReq connReq = {0};
|
||||||
|
connReq.connType = CONN_TYPE__UDFD;
|
||||||
|
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
|
||||||
|
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
|
||||||
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
|
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
|
||||||
|
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
|
||||||
|
connReq.pid = htonl(taosGetPId());
|
||||||
|
connReq.startTime = htobe64(taosGetTimestampMs());
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSConnectReq(pReq, contLen, &connReq);
|
||||||
|
|
||||||
|
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
||||||
|
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
|
||||||
|
uv_sem_init(&msgInfo->resultSem, 0);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.msgType = TDMT_MND_CONNECT;
|
||||||
|
rpcMsg.pCont = pReq;
|
||||||
|
rpcMsg.contLen = contLen;
|
||||||
|
rpcMsg.ahandle = msgInfo;
|
||||||
|
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
||||||
|
|
||||||
|
uv_sem_wait(&msgInfo->resultSem);
|
||||||
|
int32_t code = msgInfo->code;
|
||||||
|
uv_sem_destroy(&msgInfo->resultSem);
|
||||||
|
taosMemoryFree(msgInfo);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
||||||
|
SRetrieveFuncReq retrieveReq = {0};
|
||||||
|
retrieveReq.numOfFuncs = 1;
|
||||||
|
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
|
||||||
|
taosArrayPush(retrieveReq.pFuncNames, udfName);
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
|
||||||
|
void *pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
|
||||||
|
taosArrayDestroy(retrieveReq.pFuncNames);
|
||||||
|
|
||||||
|
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
||||||
|
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
|
||||||
|
msgInfo->param = udf;
|
||||||
|
uv_sem_init(&msgInfo->resultSem, 0);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = pReq;
|
||||||
|
rpcMsg.contLen = contLen;
|
||||||
|
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
|
||||||
|
rpcMsg.ahandle = msgInfo;
|
||||||
|
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
||||||
|
|
||||||
|
uv_sem_wait(&msgInfo->resultSem);
|
||||||
|
uv_sem_destroy(&msgInfo->resultSem);
|
||||||
|
int32_t code = msgInfo->code;
|
||||||
|
taosMemoryFree(msgInfo);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool udfdRpcRfp(int32_t code) {
|
||||||
|
if (code == TSDB_CODE_RPC_REDIRECT) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
|
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
|
||||||
pEpSet->version = 0;
|
pEpSet->version = 0;
|
||||||
|
@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
|
||||||
SRetrieveFuncReq retrieveReq = {0};
|
|
||||||
retrieveReq.numOfFuncs = 1;
|
|
||||||
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
|
|
||||||
taosArrayPush(retrieveReq.pFuncNames, udfName);
|
|
||||||
|
|
||||||
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
|
|
||||||
void *pReq = rpcMallocCont(contLen);
|
|
||||||
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
|
|
||||||
taosArrayDestroy(retrieveReq.pFuncNames);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
|
||||||
rpcMsg.pCont = pReq;
|
|
||||||
rpcMsg.contLen = contLen;
|
|
||||||
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
|
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {0};
|
|
||||||
rpcSendRecv(clientRpc, &global.mgmtEp.epSet, &rpcMsg, &rpcRsp);
|
|
||||||
SRetrieveFuncRsp retrieveRsp = {0};
|
|
||||||
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
|
|
||||||
|
|
||||||
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
|
|
||||||
|
|
||||||
udf->funcType = pFuncInfo->funcType;
|
|
||||||
udf->scriptType = pFuncInfo->scriptType;
|
|
||||||
udf->outputType = pFuncInfo->funcType;
|
|
||||||
udf->outputLen = pFuncInfo->outputLen;
|
|
||||||
udf->bufSize = pFuncInfo->bufSize;
|
|
||||||
|
|
||||||
char path[PATH_MAX] = {0};
|
|
||||||
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", udfName);
|
|
||||||
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
|
||||||
// TODO check for failure of flush to disk
|
|
||||||
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
|
|
||||||
taosCloseFile(&file);
|
|
||||||
strncpy(udf->path, path, strlen(path));
|
|
||||||
taosArrayDestroy(retrieveRsp.pFuncInfos);
|
|
||||||
|
|
||||||
rpcFreeCont(rpcRsp.pCont);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t udfdOpenClientRpc() {
|
int32_t udfdOpenClientRpc() {
|
||||||
char *pass = "taosdata";
|
|
||||||
char *user = "root";
|
|
||||||
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
|
|
||||||
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
|
|
||||||
SRpcInit rpcInit = {0};
|
SRpcInit rpcInit = {0};
|
||||||
rpcInit.label = (char *)"UDFD";
|
rpcInit.label = "UDFD";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 1;
|
||||||
rpcInit.cfp = udfdProcessRpcRsp;
|
rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
|
||||||
rpcInit.sessions = 1024;
|
rpcInit.sessions = 1024;
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.idleTime = 30 * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.parent = &global;
|
rpcInit.user = TSDB_DEFAULT_USER;
|
||||||
|
rpcInit.ckey = "key";
|
||||||
rpcInit.user = (char *)user;
|
|
||||||
rpcInit.ckey = (char *)"key";
|
|
||||||
rpcInit.secret = (char *)secretEncrypt;
|
|
||||||
rpcInit.spi = 1;
|
rpcInit.spi = 1;
|
||||||
|
rpcInit.parent = &global;
|
||||||
|
rpcInit.rfp = udfdRpcRfp;
|
||||||
|
|
||||||
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
|
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
|
||||||
|
rpcInit.secret = pass;
|
||||||
|
|
||||||
global.clientRpc = rpcOpen(&rpcInit);
|
global.clientRpc = rpcOpen(&rpcInit);
|
||||||
|
if (global.clientRpc == NULL) {
|
||||||
|
fnError("failed to init dnode rpc client");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -700,12 +800,6 @@ static int32_t udfdRun() {
|
||||||
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
uv_mutex_init(&global.udfsMutex);
|
uv_mutex_init(&global.udfsMutex);
|
||||||
|
|
||||||
// TOOD: client rpc to fetch udf function info from mnode
|
|
||||||
if (udfdOpenClientRpc() != 0) {
|
|
||||||
fnError("open rpc connection to mnode failure");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (udfdUvInit() != 0) {
|
if (udfdUvInit() != 0) {
|
||||||
fnError("uv init failure");
|
fnError("uv init failure");
|
||||||
return -2;
|
return -2;
|
||||||
|
@ -717,7 +811,6 @@ static int32_t udfdRun() {
|
||||||
int codeClose = uv_loop_close(global.loop);
|
int codeClose = uv_loop_close(global.loop);
|
||||||
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
|
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
|
||||||
removeListeningPipe();
|
removeListeningPipe();
|
||||||
udfdCloseClientRpc();
|
|
||||||
uv_mutex_destroy(&global.udfsMutex);
|
uv_mutex_destroy(&global.udfsMutex);
|
||||||
taosHashCleanup(global.udfsHash);
|
taosHashCleanup(global.udfsHash);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -746,9 +839,22 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
|
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
|
||||||
fnError("failed to start since read config error");
|
fnError("failed to start since read config error");
|
||||||
return -1;
|
return -2;
|
||||||
}
|
}
|
||||||
|
|
||||||
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
|
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
|
||||||
return udfdRun();
|
if (udfdOpenClientRpc() != 0) {
|
||||||
|
fnError("open rpc connection to mnode failure");
|
||||||
|
return -3;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (udfdConnectToMNode() != 0) {
|
||||||
|
fnError("failed to start since can not connect to mnode");
|
||||||
|
return -4;
|
||||||
|
}
|
||||||
|
|
||||||
|
udfdRun();
|
||||||
|
|
||||||
|
udfdCloseClientRpc();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1
|
||||||
|
|
||||||
print ========= start dnode1 as LEADER
|
print ========= start dnode1 as LEADER
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 2000
|
sleep 1000
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
print ======== step1 udf
|
print ======== step1 udf
|
||||||
|
|
Loading…
Reference in New Issue