Merge remote-tracking branch 'origin/3.0' into feature/check

This commit is contained in:
Shengliang Guan 2022-04-25 10:07:31 +08:00
commit 5f370a1dcc
17 changed files with 822 additions and 263 deletions

2
Jenkinsfile vendored
View File

@ -113,7 +113,7 @@ pipeline {
''' '''
sh''' sh'''
cd ${WKC}/debug cd ${WKC}/debug
ctest ctest -VV
''' '''
} }
} }

View File

@ -121,7 +121,7 @@ pipeline {
pre_test() pre_test()
sh''' sh'''
cd ${WKC}/debug cd ${WKC}/debug
ctest ctest -VV
''' '''
sh''' sh'''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib export LD_LIBRARY_PATH=${WKC}/debug/build/lib

View File

@ -74,8 +74,8 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
} }
char *data = colDataGetVarData(pColumnInfoData, row); char *data = colDataGetVarData(pColumnInfoData, row);
return (*data == TSDB_DATA_TYPE_NULL); return (*data == TSDB_DATA_TYPE_NULL);
} }
if (!pColumnInfoData->hasNull) { if (!pColumnInfoData->hasNull) {
return false; return false;
} }

View File

@ -1568,13 +1568,8 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq
if (tEncodeI32(&encoder, pReq->codeLen) < 0) return -1; if (tEncodeI32(&encoder, pReq->codeLen) < 0) return -1;
if (tEncodeI64(&encoder, pReq->signature) < 0) return -1; if (tEncodeI64(&encoder, pReq->signature) < 0) return -1;
int32_t codeSize = 0;
if (pReq->pCode != NULL) { if (pReq->pCode != NULL) {
codeSize = strlen(pReq->pCode) + 1; if (tEncodeBinary(&encoder, pReq->pCode, pReq->codeLen) < 0) return -1;
}
if (tEncodeI32(&encoder, codeSize) < 0) return -1;
if (pReq->pCode != NULL) {
if (tEncodeCStr(&encoder, pReq->pCode) < 0) return -1;
} }
int32_t commentSize = 0; int32_t commentSize = 0;
@ -1608,10 +1603,8 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR
if (tDecodeI32(&decoder, &pReq->codeLen) < 0) return -1; if (tDecodeI32(&decoder, &pReq->codeLen) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->signature) < 0) return -1; if (tDecodeI64(&decoder, &pReq->signature) < 0) return -1;
int32_t codeSize = 0; if (pReq->codeLen > 0) {
if (tDecodeI32(&decoder, &codeSize) < 0) return -1; pReq->pCode = taosMemoryCalloc(1, pReq->codeLen);
if (codeSize > 0) {
pReq->pCode = taosMemoryCalloc(1, codeSize);
if (pReq->pCode == NULL) { if (pReq->pCode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
@ -1734,7 +1727,7 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp *
if (tEncodeI32(&encoder, pInfo->codeSize) < 0) return -1; if (tEncodeI32(&encoder, pInfo->codeSize) < 0) return -1;
if (tEncodeI32(&encoder, pInfo->commentSize) < 0) return -1; if (tEncodeI32(&encoder, pInfo->commentSize) < 0) return -1;
if (pInfo->codeSize) { if (pInfo->codeSize) {
if (tEncodeCStr(&encoder, pInfo->pCode) < 0) return -1; if (tEncodeBinary(&encoder, pInfo->pCode, pInfo->codeSize) < 0) return -1;
} }
if (pInfo->commentSize) { if (pInfo->commentSize) {
if (tEncodeCStr(&encoder, pInfo->pComment) < 0) return -1; if (tEncodeCStr(&encoder, pInfo->pComment) < 0) return -1;

View File

@ -311,6 +311,9 @@ static void dmWatchUdfd(void *args) {
} }
static int32_t dmStartUdfd(SDnode *pDnode) { static int32_t dmStartUdfd(SDnode *pDnode) {
char dnodeId[8] = {0};
snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId);
uv_os_setenv("DNODE_ID", dnodeId);
SUdfdData *pData = &pDnode->udfdData; SUdfdData *pData = &pDnode->udfdData;
if (pData->startCalled) { if (pData->startCalled) {
dInfo("dnode-mgmt start udfd already called"); dInfo("dnode-mgmt start udfd already called");
@ -320,8 +323,17 @@ static int32_t dmStartUdfd(SDnode *pDnode) {
uv_barrier_init(&pData->barrier, 2); uv_barrier_init(&pData->barrier, 2);
uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); uv_thread_create(&pData->thread, dmWatchUdfd, pDnode);
uv_barrier_wait(&pData->barrier); uv_barrier_wait(&pData->barrier);
pData->needCleanUp = true; int32_t err = atomic_load_32(&pData->spawnErr);
return pData->spawnErr; if (err != 0) {
uv_barrier_destroy(&pData->barrier);
uv_async_send(&pData->stopAsync);
uv_thread_join(&pData->thread);
pData->needCleanUp = false;
dInfo("dnode-mgmt udfd cleaned up after spawn err");
} else {
pData->needCleanUp = true;
}
return err;
} }
static int32_t dmStopUdfd(SDnode *pDnode) { static int32_t dmStopUdfd(SDnode *pDnode) {
@ -336,7 +348,7 @@ static int32_t dmStopUdfd(SDnode *pDnode) {
uv_barrier_destroy(&pData->barrier); uv_barrier_destroy(&pData->barrier);
uv_async_send(&pData->stopAsync); uv_async_send(&pData->stopAsync);
uv_thread_join(&pData->thread); uv_thread_join(&pData->thread);
dInfo("dnode-mgmt udfd cleaned up");
return 0; return 0;
} }
@ -371,9 +383,9 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
} }
dmReportStartup(pDnode, "dnode-transport", "initialized"); dmReportStartup(pDnode, "dnode-transport", "initialized");
// if (dmStartUdfd(pDnode) != 0) { if (dmStartUdfd(pDnode) != 0) {
// dError("failed to start udfd"); dError("failed to start udfd");
// } }
dInfo("dnode-mgmt is initialized"); dInfo("dnode-mgmt is initialized");
return 0; return 0;

View File

@ -309,10 +309,10 @@ static int32_t mndProcessCreateFuncReq(SNodeMsg *pReq) {
goto _OVER; goto _OVER;
} }
if (createReq.pCode[0] == 0) { if (createReq.codeLen <= 1) {
terrno = TSDB_CODE_MND_INVALID_FUNC_CODE; terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
goto _OVER; goto _OVER;
} }
if (createReq.bufSize <= 0 || createReq.bufSize > TSDB_FUNC_BUF_SIZE) { if (createReq.bufSize <= 0 || createReq.bufSize > TSDB_FUNC_BUF_SIZE) {
terrno = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE; terrno = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;

View File

@ -22,17 +22,16 @@ class MndTestFunc : public ::testing::Test {
void SetUp() override {} void SetUp() override {}
void TearDown() override {} void TearDown() override {}
void SetCode(SCreateFuncReq* pReq, const char* pCode); void SetCode(SCreateFuncReq* pReq, const char* pCode, int32_t size);
void SetComment(SCreateFuncReq* pReq, const char* pComment); void SetComment(SCreateFuncReq* pReq, const char* pComment);
}; };
Testbase MndTestFunc::test; Testbase MndTestFunc::test;
void MndTestFunc::SetCode(SCreateFuncReq* pReq, const char* pCode) { void MndTestFunc::SetCode(SCreateFuncReq *pReq, const char *pCode, int32_t size) {
int32_t len = strlen(pCode); pReq->pCode = (char*)taosMemoryMalloc(size);
pReq->pCode = (char*)taosMemoryCalloc(1, len + 1); memcpy(pReq->pCode, pCode, size);
strcpy(pReq->pCode, pCode); pReq->codeLen = size;
pReq->codeLen = len;
} }
void MndTestFunc::SetComment(SCreateFuncReq* pReq, const char* pComment) { void MndTestFunc::SetComment(SCreateFuncReq* pReq, const char* pComment) {
@ -79,7 +78,7 @@ TEST_F(MndTestFunc, 02_Create_Func) {
{ {
SCreateFuncReq createReq = {0}; SCreateFuncReq createReq = {0};
strcpy(createReq.name, "f1"); strcpy(createReq.name, "f1");
SetCode(&createReq, ""); SetCode(&createReq, "", 1);
SetComment(&createReq, "comment1"); SetComment(&createReq, "comment1");
int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq);
@ -95,7 +94,7 @@ TEST_F(MndTestFunc, 02_Create_Func) {
{ {
SCreateFuncReq createReq = {0}; SCreateFuncReq createReq = {0};
strcpy(createReq.name, "f1"); strcpy(createReq.name, "f1");
SetCode(&createReq, "code1"); SetCode(&createReq, "code1", 6);
SetComment(&createReq, "comment1"); SetComment(&createReq, "comment1");
int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq);
@ -111,7 +110,7 @@ TEST_F(MndTestFunc, 02_Create_Func) {
{ {
SCreateFuncReq createReq = {0}; SCreateFuncReq createReq = {0};
strcpy(createReq.name, "f1"); strcpy(createReq.name, "f1");
SetCode(&createReq, "code1"); SetCode(&createReq, "code1", 6);
SetComment(&createReq, "comment1"); SetComment(&createReq, "comment1");
createReq.bufSize = TSDB_FUNC_BUF_SIZE + 1; createReq.bufSize = TSDB_FUNC_BUF_SIZE + 1;
@ -128,7 +127,7 @@ TEST_F(MndTestFunc, 02_Create_Func) {
for (int32_t i = 0; i < 3; ++i) { for (int32_t i = 0; i < 3; ++i) {
SCreateFuncReq createReq = {0}; SCreateFuncReq createReq = {0};
strcpy(createReq.name, "f1"); strcpy(createReq.name, "f1");
SetCode(&createReq, "code1"); SetCode(&createReq, "code1", 6);
SetComment(&createReq, "comment1"); SetComment(&createReq, "comment1");
createReq.bufSize = TSDB_FUNC_BUF_SIZE + 1; createReq.bufSize = TSDB_FUNC_BUF_SIZE + 1;
createReq.igExists = 0; createReq.igExists = 0;
@ -253,7 +252,7 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
createReq.outputLen = 24; createReq.outputLen = 24;
createReq.bufSize = 6; createReq.bufSize = 6;
createReq.signature = 18; createReq.signature = 18;
SetCode(&createReq, "code2"); SetCode(&createReq, "code2", 6);
SetComment(&createReq, "comment2"); SetComment(&createReq, "comment2");
int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq);
@ -439,3 +438,70 @@ TEST_F(MndTestFunc, 04_Drop_Func) {
test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "user_functions", ""); test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "user_functions", "");
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
} }
TEST_F(MndTestFunc, 05_Actual_code) {
{
SCreateFuncReq createReq = {0};
strcpy(createReq.name, "udf1");
char code[300] = {0};
for (int32_t i = 0; i < sizeof(code); ++i) {
code[i] = (i) % 20;
}
SetCode(&createReq, code, 300);
SetComment(&createReq, "comment1");
createReq.bufSize = 8;
createReq.igExists = 0;
createReq.funcType = 1;
createReq.scriptType = 2;
createReq.outputType = TSDB_DATA_TYPE_SMALLINT;
createReq.outputLen = 12;
createReq.bufSize = 4;
createReq.signature = 5;
int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSCreateFuncReq(pReq, contLen, &createReq);
tFreeSCreateFuncReq(&createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, "udf1");
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
tFreeSRetrieveFuncReq(&retrieveReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pRsp->pCont, pRsp->contLen, &retrieveRsp);
EXPECT_EQ(retrieveRsp.numOfFuncs, 1);
EXPECT_EQ(retrieveRsp.numOfFuncs, (int32_t)taosArrayGetSize(retrieveRsp.pFuncInfos));
SFuncInfo* pFuncInfo = (SFuncInfo*)taosArrayGet(retrieveRsp.pFuncInfos, 0);
EXPECT_STREQ(pFuncInfo->name, "udf1");
EXPECT_EQ(pFuncInfo->funcType, 1);
EXPECT_EQ(pFuncInfo->scriptType, 2);
EXPECT_EQ(pFuncInfo->outputType, TSDB_DATA_TYPE_SMALLINT);
EXPECT_EQ(pFuncInfo->outputLen, 12);
EXPECT_EQ(pFuncInfo->bufSize, 4);
EXPECT_EQ(pFuncInfo->signature, 5);
EXPECT_STREQ("comment1", pFuncInfo->pComment);
for (int32_t i = 0; i < 300; ++i) {
EXPECT_EQ(pFuncInfo->pCode[i], (i) % 20);
}
tFreeSRetrieveFuncRsp(&retrieveRsp);
}
}

View File

@ -29,29 +29,32 @@ extern "C" {
#define UDF_LISTEN_PIPE_NAME_LEN 32 #define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock." #define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock."
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
//====================================================================================== //======================================================================================
//begin API to taosd and qworker //begin API to taosd and qworker
enum { enum {
UDFC_CODE_STOPPING = -1, UDFC_CODE_STOPPING = -1,
UDFC_CODE_PIPE_READ_ERR = -3, UDFC_CODE_PIPE_READ_ERR = -2,
UDFC_CODE_CONNECT_PIPE_ERR = -3,
UDFC_CODE_LOAD_UDF_FAILURE = -4,
UDFC_CODE_INVALID_STATE = -5
}; };
typedef void *UdfcHandle;
typedef void *UdfcFuncHandle; typedef void *UdfcFuncHandle;
/** /**
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* @return error code * @return error code
*/ */
int32_t udfcOpen(int32_t dnodeId, UdfcHandle* proxyHandle); int32_t udfcOpen();
/** /**
* destroy udfd proxy * destroy udfd proxy
* @return error code * @return error code
*/ */
int32_t udfcClose(UdfcHandle proxyhandle); int32_t udfcClose();
/** /**
@ -60,7 +63,7 @@ int32_t udfcClose(UdfcHandle proxyhandle);
* @param handle, out * @param handle, out
* @return error code * @return error code
*/ */
int32_t setupUdf(UdfcHandle proxyHandle, char udfName[], SEpSet *epSet, UdfcFuncHandle *handle); int32_t setupUdf(char udfName[], UdfcFuncHandle *handle);
typedef struct SUdfColumnMeta { typedef struct SUdfColumnMeta {
int16_t type; int16_t type;

View File

@ -39,7 +39,6 @@ enum {
typedef struct SUdfSetupRequest { typedef struct SUdfSetupRequest {
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
SEpSet epSet;
} SUdfSetupRequest; } SUdfSetupRequest;
typedef struct SUdfSetupResponse { typedef struct SUdfSetupResponse {
@ -112,6 +111,7 @@ void freeUdfDataDataBlock(SUdfDataBlock *block);
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
int32_t getUdfdPipeName(char* pipeName, int32_t size);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -355,7 +355,7 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
pFunc->node.resType = (SDataType) { .bytes = 64, .type = TSDB_DATA_TYPE_BINARY}; pFunc->node.resType = (SDataType) { .bytes = 24, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -124,7 +124,7 @@ enum {
int64_t gUdfTaskSeqNum = 0; int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfdProxy { typedef struct SUdfdProxy {
int32_t dnodeId; char udfdPipeName[UDF_LISTEN_PIPE_NAME_LEN];
uv_barrier_t gUdfInitBarrier; uv_barrier_t gUdfInitBarrier;
uv_loop_t gUdfdLoop; uv_loop_t gUdfdLoop;
@ -137,11 +137,11 @@ typedef struct SUdfdProxy {
int8_t gUdfcState; int8_t gUdfcState;
QUEUE gUdfTaskQueue; QUEUE gUdfTaskQueue;
QUEUE gUvProcTaskQueue; QUEUE gUvProcTaskQueue;
// int8_t gUdfcState = UDFC_STATE_INITAL;
// QUEUE gUdfTaskQueue = {0}; int8_t initialized;
// QUEUE gUvProcTaskQueue = {0};
} SUdfdProxy; } SUdfdProxy;
SUdfdProxy gUdfdProxy = {0};
typedef struct SUdfUvSession { typedef struct SUdfUvSession {
SUdfdProxy *udfc; SUdfdProxy *udfc;
@ -209,19 +209,27 @@ enum {
UDFC_STATE_STARTNG, // starting after udfcOpen UDFC_STATE_STARTNG, // starting after udfcOpen
UDFC_STATE_READY, // started and begin to receive quests UDFC_STATE_READY, // started and begin to receive quests
UDFC_STATE_STOPPING, // stopping after udfcClose UDFC_STATE_STOPPING, // stopping after udfcClose
UDFC_STATUS_FINAL, // stopped
}; };
int32_t getUdfdPipeName(char* pipeName, int32_t size) {
char dnodeId[8] = {0};
size_t dnodeIdSize;
int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
if (err != 0) {
dnodeId[0] = '1';
}
snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
return 0;
}
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
int32_t len = 0; int32_t len = 0;
len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN); len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
len += taosEncodeSEpSet(buf, &setup->epSet);
return len; return len;
} }
void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) { void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN); buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
buf = taosDecodeSEpSet((void*)buf, &request->epSet);
return (void*)buf; return (void*)buf;
} }
@ -604,7 +612,7 @@ void onUdfcPipeClose(uv_handle_t *handle) {
} }
int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) { int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
debugPrint("%s", "get uv task result"); fnDebug("udfc get uv task result. task: %p", task);
if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) { if (uvTask->rspBuf.base != NULL) {
SUdfResponse rsp; SUdfResponse rsp;
@ -647,7 +655,6 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
} }
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
debugPrint("%s", "client allocate buffer to receive from pipe");
SClientUvConn *conn = handle->data; SClientUvConn *conn = handle->data;
SClientConnBuf *connBuf = &conn->readBuf; SClientConnBuf *connBuf = &conn->readBuf;
@ -662,7 +669,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
buf->base = connBuf->buf; buf->base = connBuf->buf;
buf->len = connBuf->cap; buf->len = connBuf->cap;
} else { } else {
//TODO: log error fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
buf->base = NULL; buf->base = NULL;
buf->len = 0; buf->len = 0;
} }
@ -674,13 +681,13 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
buf->base = connBuf->buf + connBuf->len; buf->base = connBuf->buf + connBuf->len;
buf->len = connBuf->cap - connBuf->len; buf->len = connBuf->cap - connBuf->len;
} else { } else {
//TODO: log error free connBuf->buf fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
buf->base = NULL; buf->base = NULL;
buf->len = 0; buf->len = 0;
} }
} }
debugPrint("\tconn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total); fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
} }
@ -689,6 +696,7 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
connBuf->total = *(int32_t *) (connBuf->buf); connBuf->total = *(int32_t *) (connBuf->buf);
} }
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) { if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
fnTrace("udfc complete message is received, now handle it");
return true; return true;
} }
return false; return false;
@ -696,10 +704,10 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
void udfcUvHandleRsp(SClientUvConn *conn) { void udfcUvHandleRsp(SClientUvConn *conn) {
SClientConnBuf *connBuf = &conn->readBuf; SClientConnBuf *connBuf = &conn->readBuf;
int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen int32_t then seqnum int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
if (QUEUE_EMPTY(&conn->taskQueue)) { if (QUEUE_EMPTY(&conn->taskQueue)) {
//LOG error fnError("udfc no task waiting for response on connection");
return; return;
} }
bool found = false; bool found = false;
@ -713,7 +721,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
found = true; found = true;
taskFound = task; taskFound = task;
} else { } else {
//LOG error; fnError("udfc more than one task waiting for the same response");
continue; continue;
} }
} }
@ -727,7 +735,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
uv_sem_post(&taskFound->taskSem); uv_sem_post(&taskFound->taskSem);
QUEUE_REMOVE(&taskFound->procTaskQueue); QUEUE_REMOVE(&taskFound->procTaskQueue);
} else { } else {
//TODO: LOG error fnError("no task is waiting for the response.");
} }
connBuf->buf = NULL; connBuf->buf = NULL;
connBuf->total = -1; connBuf->total = -1;
@ -751,7 +759,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
} }
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
debugPrint("%s, nread: %zd", "client read from pipe", nread); fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
if (nread == 0) return; if (nread == 0) return;
SClientUvConn *conn = client->data; SClientUvConn *conn = client->data;
@ -764,9 +772,9 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
} }
if (nread < 0) { if (nread < 0) {
debugPrint("\tclient read error: %s", uv_strerror(nread)); fnError("udfc client pipe %p read error: %s", client, uv_strerror(nread));
if (nread == UV_EOF) { if (nread == UV_EOF) {
//TODO: fnError("udfc client pipe %p closed", client);
} }
udfcUvHandleError(conn); udfcUvHandleError(conn);
} }
@ -774,16 +782,15 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
} }
void onUdfClientWrite(uv_write_t *write, int status) { void onUdfClientWrite(uv_write_t *write, int status) {
debugPrint("%s", "after writing to pipe");
SClientUvTaskNode *uvTask = write->data; SClientUvTaskNode *uvTask = write->data;
uv_pipe_t *pipe = uvTask->pipe;
if (status == 0) { if (status == 0) {
uv_pipe_t *pipe = uvTask->pipe;
SClientUvConn *conn = pipe->data; SClientUvConn *conn = pipe->data;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
} else { } else {
//TODO Log error; fnError("udfc client %p write error.", pipe);
} }
debugPrint("\tlength:%zu", uvTask->reqBuf.len); fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
taosMemoryFree(write); taosMemoryFree(write);
taosMemoryFree(uvTask->reqBuf.base); taosMemoryFree(uvTask->reqBuf.base);
} }
@ -841,7 +848,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
} }
int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, %d", "queue uv task", uvTask->type); fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
SUdfdProxy *udfc = uvTask->udfc; SUdfdProxy *udfc = uvTask->udfc;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue); QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue);
@ -855,7 +862,7 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
} }
int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, type %d", "start uv task ", uvTask->type); fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
switch (uvTask->type) { switch (uvTask->type) {
case UV_TASK_CONNECT: { case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
@ -874,8 +881,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
connReq->data = uvTask; connReq->data = uvTask;
uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfClientConnect);
uv_pipe_connect(connReq, pipe, "udf.sock", onUdfClientConnect);
break; break;
} }
case UV_TASK_REQ_RSP: { case UV_TASK_REQ_RSP: {
@ -971,27 +977,37 @@ void constructUdfService(void *argsThread) {
uv_loop_close(&udfc->gUdfdLoop); uv_loop_close(&udfc->gUdfdLoop);
} }
int32_t udfcOpen(int32_t dnodeId, UdfcHandle *udfc) { int32_t udfcOpen() {
SUdfdProxy *proxy = taosMemoryCalloc(1, sizeof(SUdfdProxy)); int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 0, 1);
proxy->dnodeId = dnodeId; if (old == 1) {
return 0;
}
SUdfdProxy *proxy = &gUdfdProxy;
getUdfdPipeName(proxy->udfdPipeName, UDF_LISTEN_PIPE_NAME_LEN);
proxy->gUdfcState = UDFC_STATE_STARTNG; proxy->gUdfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->gUdfInitBarrier, 2); uv_barrier_init(&proxy->gUdfInitBarrier, 2);
uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
uv_barrier_wait(&proxy->gUdfInitBarrier); atomic_store_8(&proxy->gUdfcState, UDFC_STATE_READY);
proxy->gUdfcState = UDFC_STATE_READY; proxy->gUdfcState = UDFC_STATE_READY;
*udfc = proxy; uv_barrier_wait(&proxy->gUdfInitBarrier);
fnInfo("udfc initialized")
return 0; return 0;
} }
int32_t udfcClose(UdfcHandle udfcHandle) { int32_t udfcClose() {
SUdfdProxy *udfc = udfcHandle; int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 1, 0);
if (old == 0) {
return 0;
}
SUdfdProxy *udfc = &gUdfdProxy;
udfc->gUdfcState = UDFC_STATE_STOPPING; udfc->gUdfcState = UDFC_STATE_STOPPING;
uv_async_send(&udfc->gUdfLoopStopAsync); uv_async_send(&udfc->gUdfLoopStopAsync);
uv_thread_join(&udfc->gUdfLoopThread); uv_thread_join(&udfc->gUdfLoopThread);
uv_mutex_destroy(&udfc->gUdfTaskQueueMutex); uv_mutex_destroy(&udfc->gUdfTaskQueueMutex);
uv_barrier_destroy(&udfc->gUdfInitBarrier); uv_barrier_destroy(&udfc->gUdfInitBarrier);
udfc->gUdfcState = UDFC_STATUS_FINAL; udfc->gUdfcState = UDFC_STATE_INITAL;
taosMemoryFree(udfc); fnInfo("udfc cleaned up");
return 0; return 0;
} }
@ -1009,12 +1025,15 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
return task->errCode; return task->errCode;
} }
int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle *funcHandle) { int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
debugPrint("%s", "client setup udf"); fnInfo("udfc setup udf. udfName: %s", udfName);
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
return UDFC_CODE_INVALID_STATE;
}
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
task->session = taosMemoryMalloc(sizeof(SUdfUvSession)); task->session = taosMemoryMalloc(sizeof(SUdfUvSession));
task->session->udfc = udfc; task->session->udfc = &gUdfdProxy;
task->type = UDF_TASK_SETUP; task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req; SUdfSetupRequest *req = &task->_setup.req;
@ -1022,15 +1041,20 @@ int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle
int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT); int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) { if (errCode != 0) {
//TODO: log error fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
return -1; return UDFC_CODE_CONNECT_PIPE_ERR;
} }
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfSetupResponse *rsp = &task->_setup.rsp; SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle; task->session->severHandle = rsp->udfHandle;
*funcHandle = task->session; if (task->errCode != 0) {
fnError("failed to setup udf. err: %d", task->errCode)
} else {
fnInfo("sucessfully setup udf func handle. handle: %p", task->session);
*funcHandle = task->session;
}
int32_t err = task->errCode; int32_t err = task->errCode;
taosMemoryFree(task); taosMemoryFree(task);
return err; return err;
@ -1038,7 +1062,7 @@ int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) { SSDataBlock* output, SUdfInterBuf *newState) {
debugPrint("%s", "client call udf"); fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
@ -1076,35 +1100,37 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfCallResponse *rsp = &task->_call.rsp; if (task->errCode != 0) {
switch (callType) { fnError("call udf failure. err: %d", task->errCode);
case TSDB_UDF_CALL_AGG_INIT: { } else {
*newState = rsp->resultBuf; SUdfCallResponse *rsp = &task->_call.rsp;
break; switch (callType) {
} case TSDB_UDF_CALL_AGG_INIT: {
case TSDB_UDF_CALL_AGG_PROC: { *newState = rsp->resultBuf;
*newState = rsp->resultBuf; break;
break; }
} case TSDB_UDF_CALL_AGG_PROC: {
case TSDB_UDF_CALL_AGG_MERGE: { *newState = rsp->resultBuf;
*newState = rsp->resultBuf; break;
break; }
} case TSDB_UDF_CALL_AGG_MERGE: {
case TSDB_UDF_CALL_AGG_FIN: { *newState = rsp->resultBuf;
*newState = rsp->resultBuf; break;
break; }
} case TSDB_UDF_CALL_AGG_FIN: {
case TSDB_UDF_CALL_SCALA_PROC: { *newState = rsp->resultBuf;
*output = rsp->resultData; break;
break; }
case TSDB_UDF_CALL_SCALA_PROC: {
*output = rsp->resultData;
break;
}
} }
} }
taosMemoryFree(task); taosMemoryFree(task);
return task->errCode; return task->errCode;
} }
//TODO: translate these calls to callUdf
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_INIT; int8_t callType = TSDB_UDF_CALL_AGG_INIT;
@ -1148,7 +1174,7 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu
} }
int32_t teardownUdf(UdfcFuncHandle handle) { int32_t teardownUdf(UdfcFuncHandle handle) {
debugPrint("%s", "client teardown udf"); fnInfo("tear down udf. udf func handle: %p", handle);
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
@ -1160,7 +1186,6 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfTeardownResponse *rsp = &task->_teardown.rsp; SUdfTeardownResponse *rsp = &task->_teardown.rsp;
int32_t err = task->errCode; int32_t err = task->errCode;

View File

@ -20,6 +20,7 @@
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#include "tdatablock.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
@ -31,8 +32,9 @@ typedef struct SUdfdContext {
uv_signal_t intrSignal; uv_signal_t intrSignal;
char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN];
uv_pipe_t listeningPipe; uv_pipe_t listeningPipe;
void *clientRpc;
void *clientRpc;
SCorEpSet mgmtEp;
uv_mutex_t udfsMutex; uv_mutex_t udfsMutex;
SHashObj *udfsHash; SHashObj *udfsHash;
@ -63,8 +65,13 @@ typedef struct SUdf {
uv_mutex_t lock; uv_mutex_t lock;
uv_cond_t condReady; uv_cond_t condReady;
char name[16]; char name[TSDB_FUNC_NAME_LEN];
int8_t type; int8_t funcType;
int8_t scriptType;
int8_t outputType;
int32_t outputLen;
int32_t bufSize;
char path[PATH_MAX]; char path[PATH_MAX];
uv_lib_t lib; uv_lib_t lib;
@ -78,17 +85,17 @@ typedef struct SUdfcFuncHandle {
SUdf *udf; SUdf *udf;
} SUdfcFuncHandle; } SUdfcFuncHandle;
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf); int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
int32_t udfdLoadUdf(char *udfName, SEpSet *pEpSet, SUdf *udf) { int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strcpy(udf->name, udfName); strcpy(udf->name, udfName);
udfdFillUdfInfoFromMNode(global.clientRpc, pEpSet, udf->name, udf); udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
//strcpy(udf->path, "/home/slzhou/TDengine/debug/build/lib/libudf1.so");
int err = uv_dlopen(udf->path, &udf->lib); int err = uv_dlopen(udf->path, &udf->lib);
if (err != 0) { if (err != 0) {
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
// TODO set error return UDFC_CODE_LOAD_UDF_FAILURE;
} }
// TODO: find all the functions // TODO: find all the functions
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
@ -115,8 +122,8 @@ void udfdProcessRequest(uv_work_t *req) {
SUdf *udf = NULL; SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN); SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName));
if (*udfInHash) { if (udfInHash) {
++(*udfInHash)->refCount; ++(*udfInHash)->refCount;
udf = *udfInHash; udf = *udfInHash;
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
@ -128,14 +135,14 @@ void udfdProcessRequest(uv_work_t *req) {
uv_mutex_init(&udfNew->lock); uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady); uv_cond_init(&udfNew->condReady);
udf = udfNew; udf = udfNew;
taosHashPut(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN, &udfNew, sizeof(&udfNew)); taosHashPut(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName), &udfNew, sizeof(&udfNew));
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
} }
uv_mutex_lock(&udf->lock); uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) { if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING; udf->state = UDF_STATE_LOADING;
udfdLoadUdf(setup->udfName, &setup->epSet, udf); udfdLoadUdf(setup->udfName, udf);
udf->state = UDF_STATE_READY; udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady); uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock); uv_mutex_unlock(&udf->lock);
@ -214,7 +221,7 @@ void udfdProcessRequest(uv_work_t *req) {
udf->refCount--; udf->refCount--;
if (udf->refCount == 0) { if (udf->refCount == 0) {
unloadUdf = true; unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, TSDB_FUNC_NAME_LEN); taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
} }
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
if (unloadUdf) { if (unloadUdf) {
@ -393,7 +400,48 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf) { int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
pEpSet->version = 0;
// init mnode ip set
SEpSet* mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
if (firstEp && firstEp[0] != 0) {
if (strlen(firstEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return terrno;
}
mgmtEpSet->numOfEps++;
}
if (secondEp && secondEp[0] != 0) {
if (strlen(secondEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++;
}
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
return 0;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0}; SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1; retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
@ -410,15 +458,21 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName,
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
rpcSendRecv(clientRpc, pEpSet, &rpcMsg, &rpcRsp); rpcSendRecv(clientRpc, &global.mgmtEp.epSet, &rpcMsg, &rpcRsp);
SRetrieveFuncRsp retrieveRsp = {0}; SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp); tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
taosGetTmpfilePath("/tmp", "libudf", path); snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", udfName);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk // TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file); taosCloseFile(&file);
@ -531,15 +585,7 @@ static int32_t udfdUvInit() {
uv_pipe_open(&global.ctrlPipe, 0); uv_pipe_open(&global.ctrlPipe, 0);
uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
char dnodeId[8] = {0}; getUdfdPipeName(global.listenPipeName, UDF_LISTEN_PIPE_NAME_LEN);
size_t dnodeIdSize;
int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize);
if (err != 0) {
dnodeId[0] = '1';
}
char listenPipeName[32] = {0};
snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
strcpy(global.listenPipeName, listenPipeName);
removeListeningPipe(); removeListeningPipe();
@ -550,7 +596,7 @@ static int32_t udfdUvInit() {
int r; int r;
fnInfo("bind to pipe %s", global.listenPipeName); fnInfo("bind to pipe %s", global.listenPipeName);
if ((r = uv_pipe_bind(&global.listeningPipe, listenPipeName))) { if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
fnError("Bind error %s", uv_err_name(r)); fnError("Bind error %s", uv_err_name(r));
removeListeningPipe(); removeListeningPipe();
return -1; return -1;
@ -580,7 +626,7 @@ static int32_t udfdRun() {
fnInfo("start the udfd"); fnInfo("start the udfd");
int code = uv_run(global.loop, UV_RUN_DEFAULT); int code = uv_run(global.loop, UV_RUN_DEFAULT);
fnInfo("udfd stopped. result: %s", uv_err_name(code)); fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code);
int codeClose = uv_loop_close(global.loop); int codeClose = uv_loop_close(global.loop);
fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
udfdCloseClientRpc(); udfdCloseClientRpc();
@ -615,5 +661,6 @@ int main(int argc, char *argv[]) {
return -1; return -1;
} }
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
return udfdRun(); return udfdRun();
} }

View File

@ -1,61 +1,84 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "uv.h" #include "uv.h"
#include "fnLog.h"
#include "os.h" #include "os.h"
#include "tudf.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h"
#include "tudf.h"
static int32_t parseArgs(int32_t argc, char *argv[]) {
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
if (strlen(argv[++i]) >= PATH_MAX) {
printf("config file path overflow");
return -1;
}
tstrncpy(configDir, argv[i], PATH_MAX);
} else {
printf("'-c' requires a parameter, default is %s\n", configDir);
return -1;
}
}
}
return 0;
}
static int32_t initLog() {
char logName[12] = {0};
snprintf(logName, sizeof(logName), "%slog", "udfc");
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0);
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
UdfcHandle udfc; parseArgs(argc, argv);
udfcOpen(1, &udfc); initLog();
uv_sleep(1000); if (taosInitCfg(configDir, NULL, NULL, NULL, 0) != 0) {
char path[256] = {0}; fnError("failed to start since read config error");
size_t cwdSize = 256; return -1;
int err = uv_cwd(path, &cwdSize); }
if (err != 0) {
fprintf(stderr, "err cwd: %s\n", uv_strerror(err)); udfcOpen();
return err; uv_sleep(1000);
UdfcFuncHandle handle;
setupUdf("udf1", &handle);
SSDataBlock block = {0};
SSDataBlock *pBlock = &block;
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pBlock->info.numOfCols = 1;
pBlock->info.rows = 4;
char data[16] = {0};
char bitmap[4] = {0};
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = data;
colInfo.nullbitmap = bitmap;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j);
} }
fprintf(stdout, "current working directory:%s\n", path); taosArrayPush(pBlock->pDataBlock, &colInfo);
strcat(path, "/libudf1.so"); }
UdfcFuncHandle handle; SScalarParam input = {0};
SEpSet epSet; input.numOfRows = pBlock->info.rows;
setupUdf(udfc, "udf1", &epSet, &handle); input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0};
callUdfScalarFunc(handle, &input, 1, &output);
SSDataBlock block = {0}; SColumnInfoData *col = output.columnData;
SSDataBlock* pBlock = &block; for (int32_t i = 0; i < output.numOfRows; ++i) {
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
pBlock->info.numOfCols = 1; }
pBlock->info.rows = 4; teardownUdf(handle);
char data[16] = {0}; udfcClose();
char bitmap[4] = {0};
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = data;
colInfo.nullbitmap = bitmap;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j);
}
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
SScalarParam input = {0};
input.numOfRows = pBlock->info.rows;
input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0};
callUdfScalarFunc(handle, &input, 1 , &output);
SColumnInfoData *col = output.columnData;
for (int32_t i = 0; i < output.numOfRows; ++i) {
fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t)));
}
teardownUdf(handle);
udfcClose(udfc);
} }

View File

@ -32,8 +32,8 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
float *in = (float *)pInputData->pData; float *in = (float *)pInputData->pData;
float *out = (float *)pOutputData->pData; float *out = (float *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = (in[i] >= 0)? in[i] : -in[i]; out[i] = (in[i] >= 0)? in[i] : -in[i];
@ -45,8 +45,8 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
double *in = (double *)pInputData->pData; double *in = (double *)pInputData->pData;
double *out = (double *)pOutputData->pData; double *out = (double *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = (in[i] >= 0)? in[i] : -in[i]; out[i] = (in[i] >= 0)? in[i] : -in[i];
@ -58,8 +58,8 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
int8_t *in = (int8_t *)pInputData->pData; int8_t *in = (int8_t *)pInputData->pData;
int8_t *out = (int8_t *)pOutputData->pData; int8_t *out = (int8_t *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = (in[i] >= 0)? in[i] : -in[i]; out[i] = (in[i] >= 0)? in[i] : -in[i];
@ -71,8 +71,8 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
int16_t *in = (int16_t *)pInputData->pData; int16_t *in = (int16_t *)pInputData->pData;
int16_t *out = (int16_t *)pOutputData->pData; int16_t *out = (int16_t *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = (in[i] >= 0)? in[i] : -in[i]; out[i] = (in[i] >= 0)? in[i] : -in[i];
@ -84,8 +84,8 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
int32_t *in = (int32_t *)pInputData->pData; int32_t *in = (int32_t *)pInputData->pData;
int32_t *out = (int32_t *)pOutputData->pData; int32_t *out = (int32_t *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = (in[i] >= 0)? in[i] : -in[i]; out[i] = (in[i] >= 0)? in[i] : -in[i];
@ -97,8 +97,8 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
int64_t *in = (int64_t *)pInputData->pData; int64_t *in = (int64_t *)pInputData->pData;
int64_t *out = (int64_t *)pOutputData->pData; int64_t *out = (int64_t *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = (in[i] >= 0)? in[i] : -in[i]; out[i] = (in[i] >= 0)? in[i] : -in[i];
@ -129,8 +129,8 @@ static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SS
double *out = (double *)pOutputData->pData; double *out = (double *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = valFn(getValueFn(pInputData->pData, i)); out[i] = valFn(getValueFn(pInputData->pData, i));
@ -157,9 +157,9 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
double *out = (double *)pOutputData->pData; double *out = (double *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData[0]->nullbitmap, i) || if (colDataIsNull_s(pInputData[0], i) ||
colDataIsNull_f(pInputData[1]->nullbitmap, 0)) { colDataIsNull_s(pInputData[1], 0)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = valFn(getValueFn[0](pInputData[0]->pData, i), getValueFn[1](pInputData[1]->pData, 0)); out[i] = valFn(getValueFn[0](pInputData[0]->pData, i), getValueFn[1](pInputData[1]->pData, 0));
@ -184,8 +184,8 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
float *out = (float *)pOutputData->pData; float *out = (float *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = f1(in[i]); out[i] = f1(in[i]);
@ -198,8 +198,8 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
double *out = (double *)pOutputData->pData; double *out = (double *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }
out[i] = d1(in[i]); out[i] = d1(in[i]);
@ -301,7 +301,7 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) { if (colDataIsNull_s(pInputData, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i); colDataAppendNULL(pOutputData, i);
continue; continue;
} }

View File

@ -0,0 +1,103 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
print =============== create database
sql create database db
sql show databases
if $rows != 2 then
return -1
endi
sql use db
print =============== create super table and child table
sql create table stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int)
sql show stables
print $rows $data00 $data01 $data02
if $rows != 1 then
return -1
endi
sql create table ct1 using stb1 tags ( 1 )
sql create table ct2 using stb1 tags ( 2 )
sql create table ct3 using stb1 tags ( 3 )
sql create table ct4 using stb1 tags ( 4 )
sql show tables
print $rows $data00 $data10 $data20
if $rows != 4 then
return -1
endi
print =============== insert data into child table ct1 (s)
sql insert into ct1 values ( '2022-01-01 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now+1a )
sql insert into ct1 values ( '2022-01-01 01:01:06.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now+2a )
sql insert into ct1 values ( '2022-01-01 01:01:10.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now+3a )
sql insert into ct1 values ( '2022-01-01 01:01:16.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now+4a )
sql insert into ct1 values ( '2022-01-01 01:01:20.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now+5a )
sql insert into ct1 values ( '2022-01-01 01:01:26.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now+6a )
sql insert into ct1 values ( '2022-01-01 01:01:30.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", now+7a )
sql insert into ct1 values ( '2022-01-01 01:01:36.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", now+8a )
print =============== insert data into child table ct2 (d)
sql insert into ct2 values ( '2022-01-01 01:00:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now+1a )
sql insert into ct2 values ( '2022-01-01 10:00:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now+2a )
sql insert into ct2 values ( '2022-01-01 20:00:01.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now+3a )
sql insert into ct2 values ( '2022-01-02 10:00:01.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now+4a )
sql insert into ct2 values ( '2022-01-02 20:00:01.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now+5a )
sql insert into ct2 values ( '2022-01-03 10:00:01.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", now+6a )
sql insert into ct2 values ( '2022-01-03 20:00:01.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", now+7a )
print =============== insert data into child table ct3 (n)
sql insert into ct3 values ( '2021-12-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
sql insert into ct3 values ( '2021-12-31 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now+1a )
sql insert into ct3 values ( '2022-01-01 01:01:06.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now+2a )
sql insert into ct3 values ( '2022-01-07 01:01:10.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now+3a )
sql insert into ct3 values ( '2022-01-31 01:01:16.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now+4a )
sql insert into ct3 values ( '2022-02-01 01:01:20.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now+5a )
sql insert into ct3 values ( '2022-02-28 01:01:26.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now+6a )
sql insert into ct3 values ( '2022-03-01 01:01:30.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
sql insert into ct3 values ( '2022-03-08 01:01:36.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
print =============== insert data into child table ct4 (y)
sql insert into ct4 values ( '2019-01-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
sql insert into ct4 values ( '2019-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now+1a )
sql insert into ct4 values ( '2019-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now+2a )
sql insert into ct4 values ( '2020-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now+3a )
sql insert into ct4 values ( '2020-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now+4a )
sql insert into ct4 values ( '2020-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now+5a )
sql insert into ct4 values ( '2020-12-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
sql insert into ct4 values ( '2021-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now+6a )
sql insert into ct4 values ( '2021-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
sql insert into ct4 values ( '2021-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
sql insert into ct4 values ( '2022-02-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
sql insert into ct4 values ( '2022-05-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
print ================ start query ======================
print ================ SQL used to cause taosd or taos shell crash
sql select sum(c1) ,count(c1) from ct4 group by c1 having sum(c10) between 0 and 1 ;
#system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -6,6 +6,7 @@ import inspect
from util.log import * from util.log import *
from util.sql import * from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import *
@ -15,56 +16,50 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring def __cast_to_bigint(self, col_name, tbname):
tdSql.prepare() __sql = f"select cast({col_name} as bigint), {col_name} from {tbname}"
tdSql.query(sql=__sql)
data_tb_col = [result[1] for result in tdSql.queryResult]
for i in range(len(tdSql.queryRows)):
tdSql.checkData( i, 0, None ) if data_tb_col[i] is None else tdSql.checkData( i, 0, int(data_tb_col[i]) )
tdLog.printNoPrefix("==========step1:create table") def __range_to_bigint(self,cols,tables):
tdSql.execute( for col in cols:
'''create table stb1 for table in tables:
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) self.__cast_to_bigint(col_name=col, tbname=table)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
tdLog.printNoPrefix("==========step2:insert data") def __cast_to_timestamp(self, col_name, tbname):
for i in range(9): __sql = f"select cast({col_name} as timestamp), {col_name} from {tbname}"
tdSql.execute( tdSql.query(sql=__sql)
f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" data_tb_col = [result[1] for result in tdSql.queryResult]
) for i in range(len(tdSql.queryRows)):
tdSql.execute( if data_tb_col[i] is None:
f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" tdSql.checkData( i, 0 , None )
) if (col_name == "c2" or col_name == "double" ) and tbname == "t1" and i == 10:
tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )") continue
tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") else:
utc_zone = datetime.timezone.utc
utc_8 = datetime.timezone(datetime.timedelta(hours=8))
date_init_stamp = datetime.datetime.utcfromtimestamp(data_tb_col[i]/1000)
date_data = date_init_stamp.replace(tzinfo=utc_zone).astimezone(utc_8).strftime("%Y-%m-%d %H:%M:%S.%f")
tdSql.checkData( i, 0, date_data)
tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") def __range_to_timestamp(self, cols, tables):
tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") for col in cols:
tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") for table in tables:
self.__cast_to_timestamp(col_name=col, tbname=table)
tdSql.execute( def __test_bigint(self):
f'''insert into t1 values __table_list = ["ct1", "ct4", "t1"]
( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) __col_list = ["c1","c2","c3","c4","c5","c6","c7","c10","c1+c2"]
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) self.__range_to_bigint(cols=__col_list, tables=__table_list)
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) def __test_timestamp(self):
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) __table_list = ["ct1", "ct4", "t1"]
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) __col_list = ["c1","c2","c3","c4","c5","c6","c7","c1+c2"]
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) self.__range_to_timestamp(cols=__col_list, tables=__table_list)
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) def all_test(self):
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
'''
)
tdSql.query("select c1 from ct4") tdSql.query("select c1 from ct4")
data_ct4_c1 = [tdSql.getData(i,0) for i in range(tdSql.queryRows)] data_ct4_c1 = [tdSql.getData(i,0) for i in range(tdSql.queryRows)]
@ -82,9 +77,9 @@ class TDTestCase:
tdLog.printNoPrefix("==========step5: cast int to binary, expect changes to str(int) ") tdLog.printNoPrefix("==========step5: cast int to binary, expect changes to str(int) ")
tdSql.query("select cast(c1 as binary(32)) as b from ct4") #tdSql.query("select cast(c1 as binary(32)) as b from ct4")
for i in range(len(data_ct4_c1)): #for i in range(len(data_ct4_c1)):
tdSql.checkData( i, 0, str(data_ct4_c1[i]) ) # tdSql.checkData( i, 0, str(data_ct4_c1[i]) )
tdSql.query("select cast(c1 as binary(32)) as b from t1") tdSql.query("select cast(c1 as binary(32)) as b from t1")
for i in range(len(data_t1_c1)): for i in range(len(data_t1_c1)):
tdSql.checkData( i, 0, str(data_t1_c1[i]) ) tdSql.checkData( i, 0, str(data_t1_c1[i]) )
@ -240,7 +235,7 @@ class TDTestCase:
tdSql.checkData( i, 0, date_data) tdSql.checkData( i, 0, date_data)
tdLog.printNoPrefix("==========step16: cast smallint to bigint, expect no changes") tdLog.printNoPrefix("==========step16: cast tinyint to bigint, expect no changes")
tdSql.query("select c4 from ct4") tdSql.query("select c4 from ct4")
data_ct4_c4 = [tdSql.getData(i,0) for i in range(tdSql.queryRows)] data_ct4_c4 = [tdSql.getData(i,0) for i in range(tdSql.queryRows)]
tdSql.query("select c4 from t1") tdSql.query("select c4 from t1")
@ -254,7 +249,7 @@ class TDTestCase:
tdSql.checkData( i, 0, data_t1_c4[i]) tdSql.checkData( i, 0, data_t1_c4[i])
tdLog.printNoPrefix("==========step17: cast smallint to binary, expect changes to str(int) ") tdLog.printNoPrefix("==========step17: cast tinyint to binary, expect changes to str(int) ")
tdSql.query("select cast(c4 as binary(32)) as b from ct4") tdSql.query("select cast(c4 as binary(32)) as b from ct4")
for i in range(len(data_ct4_c4)): for i in range(len(data_ct4_c4)):
@ -263,7 +258,7 @@ class TDTestCase:
for i in range(len(data_t1_c4)): for i in range(len(data_t1_c4)):
tdSql.checkData( i, 0, str(data_t1_c4[i]) ) tdSql.checkData( i, 0, str(data_t1_c4[i]) )
tdLog.printNoPrefix("==========step18: cast smallint to nchar, expect changes to str(int) ") tdLog.printNoPrefix("==========step18: cast tinyint to nchar, expect changes to str(int) ")
tdSql.query("select cast(c4 as nchar(32)) as b from ct4") tdSql.query("select cast(c4 as nchar(32)) as b from ct4")
for i in range(len(data_ct4_c4)): for i in range(len(data_ct4_c4)):
@ -272,7 +267,7 @@ class TDTestCase:
for i in range(len(data_t1_c4)): for i in range(len(data_t1_c4)):
tdSql.checkData( i, 0, str(data_t1_c4[i]) ) tdSql.checkData( i, 0, str(data_t1_c4[i]) )
tdLog.printNoPrefix("==========step19: cast smallint to timestamp, expect changes to timestamp ") tdLog.printNoPrefix("==========step19: cast tinyint to timestamp, expect changes to timestamp ")
tdSql.query("select cast(c4 as timestamp) as b from ct4") tdSql.query("select cast(c4 as timestamp) as b from ct4")
for i in range(len(data_ct4_c4)): for i in range(len(data_ct4_c4)):
@ -624,7 +619,67 @@ class TDTestCase:
tdSql.error("select cast(c8 as timestamp ) as b from ct4") tdSql.error("select cast(c8 as timestamp ) as b from ct4")
tdSql.error("select cast(c9 as timestamp ) as b from ct4") tdSql.error("select cast(c9 as timestamp ) as b from ct4")
tdSql.error("select cast(c9 as binary(64) ) as b from ct4") tdSql.error("select cast(c9 as binary(64) ) as b from ct4")
pass
def run(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table")
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
tdLog.printNoPrefix("==========step2:insert data")
for i in range(9):
tdSql.execute(
f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
)
tdSql.execute(
f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
)
tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute(
f'''insert into t1 values
( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
'''
)
self.all_test()
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.execute("use db")
self.all_test()
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -0,0 +1,232 @@
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
INT_COL = "c1"
BINT_COL = "c2"
SINT_COL = "c3"
TINT_COL = "c4"
FLOAT_COL = "c5"
DOUBLE_COL = "c6"
BOOL_COL = "c7"
BINARY_COL = "c8"
NCHAR_COL = "c9"
TS_COL = "c10"
NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
UN_NUM_COL = [BOOL_COL, BINARY_COL, NCHAR_COL, ]
TS_TYPE_COL = [TS_COL]
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def __sum_condition(self):
sum_condition = []
for num_col in NUM_COL:
sum_condition.extend(
(
num_col,
f"ceil( {num_col} )",
)
)
sum_condition.extend( f"{num_col} + {num_col_2}" for num_col_2 in NUM_COL )
sum_condition.extend( f"{num_col} + {un_num_col} " for un_num_col in UN_NUM_COL )
sum_condition.append(1)
return sum_condition
def __where_condition(self, col):
return f" where abs( {col} ) < 1000000 "
def __group_condition(self, col, having = ""):
return f" group by {col} having {having}" if having else f" group by {col} "
def __sum_current_check(self, tbname):
sum_condition = self.__sum_condition()
for condition in sum_condition:
where_condition = self.__where_condition(condition)
group_condition = self.__group_condition(condition, having=f"{condition} is not null " )
tdSql.query(f"select {condition} from {tbname} {where_condition} ")
datas = [tdSql.getData(i,0) for i in range(tdSql.queryRows)]
sum_data = sum(filter(None, datas))
tdSql.query(f"select sum( {condition} ) from {tbname} {where_condition} ")
tdSql.checkData(0, 0, sum_data)
tdSql.query(f"select {condition} from {tbname} {where_condition} {group_condition} ")
def __sum_err_check(self,tbanme):
sqls = []
for un_num_col in UN_NUM_COL:
sqls.extend(
(
f"select sum( {un_num_col} ) from {tbanme} ",
f"select sum(ceil( {un_num_col} )) from {tbanme} ",
)
)
sqls.extend( f"select sum( {un_num_col} + {un_num_col_2} ) from {tbanme} " for un_num_col_2 in UN_NUM_COL )
sqls.extend( f"select sum( {num_col} + {ts_col} ) from {tbanme} " for num_col in NUM_COL for ts_col in TS_TYPE_COL)
sqls.extend(
(
f"select sum() from {tbanme} ",
f"select sum(*) from {tbanme} ",
f"select sum(ccccccc) from {tbanme} ",
f"select sum('test') from {tbanme} ",
)
)
return sqls
def __test_current(self):
tdLog.printNoPrefix("==========current sql condition check , must return query ok==========")
tbname = ["ct1", "ct2", "ct4", "t1"]
for tb in tbname:
self.__sum_current_check(tb)
tdLog.printNoPrefix(f"==========current sql condition check in {tb} over==========")
def __test_error(self):
tdLog.printNoPrefix("==========err sql condition check , must return error==========")
tbname = ["ct1", "ct2", "ct4", "t1"]
for tb in tbname:
for errsql in self.__sum_err_check(tb):
tdSql.error(sql=errsql)
tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========")
def all_test(self):
self.__test_current()
self.__test_error()
def __create_tb(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table")
create_stb_sql = f'''create table stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
) tags (t1 int)
'''
create_ntb_sql = f'''create table t1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
)
'''
tdSql.execute(create_stb_sql)
tdSql.execute(create_ntb_sql)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def __insert_data(self, rows):
for i in range(9):
tdSql.execute(
f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
)
tdSql.execute(
f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
)
tdSql.execute(
f"insert into ct2 values ( now()-{i*90}d, {-1*i}, {-11111*i}, {-111*i}, {-11*i}, {-1.11*i}, {-11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
)
tdSql.execute(
'''insert into ct1 values
( now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )
( now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )
'''
)
tdSql.execute(
f'''insert into ct4 values
( now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( now()+{rows * 9}d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
(
now()+{rows * 9-10}d, {pow(2,31)-pow(2,15)}, {pow(2,63)-pow(2,30)}, 32767, 127,
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nachar_limit-1", now()-1d
)
(
now()+{rows * 9-20}d, {pow(2,31)-pow(2,16)}, {pow(2,63)-pow(2,31)}, 32766, 126,
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nachar_limit-2", now()-2d
)
'''
)
tdSql.execute(
f'''insert into ct2 values
( now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( now()+{rows * 9}d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
(
now()+{rows * 9-10}d, { -1 * pow(2,31) + pow(2,15) }, { -1 * pow(2,63) + pow(2,30) }, -32766, -126,
{ -1 * 3.2 * pow(10,38) }, { -1.2 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nachar_limit-1", now()-1d
)
(
now()+{rows * 9-20}d, { -1 * pow(2,31) + pow(2,16) }, { -1 * pow(2,63) + pow(2,31) }, -32767, -127,
{ - 3.3 * pow(10,38) }, { -1.3 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nachar_limit-2", now()-2d
)
'''
)
for i in range(rows):
insert_data = f'''insert into t1 values
( now()-{i}h, {i}, {i}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2},
"binary_{i}", "nchar_{i}", now()-{i}s )
'''
tdSql.execute(insert_data)
tdSql.execute(
f'''insert into t1 values
( now() + 3h, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( now()-{ ( rows // 2 ) * 60 + 30 }m, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( now()-{rows}h, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( now() + 2h, { pow(2,31) - pow(2,15) }, { pow(2,63) - pow(2,30) }, 32767, 127,
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 },
"binary_limit-1", "nachar_limit-1", now()-1d
)
(
now() + 1h , { pow(2,31) - pow(2,16) }, { pow(2,63) - pow(2,31) }, 32766, 126,
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 },
"binary_limit-2", "nachar_limit-2", now()-2d
)
'''
)
def run(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table")
self.__create_tb()
tdLog.printNoPrefix("==========step2:insert data")
self.__insert_data(100)
tdLog.printNoPrefix("==========step3:all check")
self.all_test()
# tdDnodes.stop(1)
# tdDnodes.start(1)
# tdSql.execute("use db")
# tdLog.printNoPrefix("==========step4:after wal, all check again ")
# self.all_test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())