Merge branch '3.0' into feature/TD-13066-3.0
This commit is contained in:
commit
8d2754259d
|
@ -654,6 +654,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_UDF_PIPE_NO_PIPE TAOS_DEF_ERROR_CODE(0, 0x2904)
|
||||
#define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905)
|
||||
#define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906)
|
||||
#define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907)
|
||||
|
||||
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)
|
||||
#define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001)
|
||||
|
|
|
@ -444,7 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "startUdfd", tsStartUdfd, 0) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -585,7 +585,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
|
||||
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
|
||||
|
||||
tsStartUdfd = cfgGetItem(pCfg, "startUdfd")->bval;
|
||||
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
|
||||
|
||||
if (tsQueryBufferSize >= 0) {
|
||||
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
||||
|
|
|
@ -1497,7 +1497,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
|||
taosArrayDestroy(tempBlock.pDataBlock);
|
||||
|
||||
taosMemoryFree(newState.buf);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return udfCode;
|
||||
}
|
||||
|
||||
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||
|
|
|
@ -140,6 +140,182 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
||||
// TODO: tracable id from client. connect, setup, call, teardown
|
||||
fnInfo("%" PRId64 " setup request. udf name: %s", request->seqNum, request->setup.udfName);
|
||||
SUdfSetupRequest *setup = &request->setup;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SUdf *udf = NULL;
|
||||
uv_mutex_lock(&global.udfsMutex);
|
||||
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
|
||||
if (udfInHash) {
|
||||
++(*udfInHash)->refCount;
|
||||
udf = *udfInHash;
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
} else {
|
||||
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
||||
udfNew->refCount = 1;
|
||||
udfNew->state = UDF_STATE_INIT;
|
||||
|
||||
uv_mutex_init(&udfNew->lock);
|
||||
uv_cond_init(&udfNew->condReady);
|
||||
udf = udfNew;
|
||||
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
}
|
||||
|
||||
uv_mutex_lock(&udf->lock);
|
||||
if (udf->state == UDF_STATE_INIT) {
|
||||
udf->state = UDF_STATE_LOADING;
|
||||
code = udfdLoadUdf(setup->udfName, udf);
|
||||
if (udf->initFunc) {
|
||||
udf->initFunc();
|
||||
}
|
||||
udf->state = UDF_STATE_READY;
|
||||
uv_cond_broadcast(&udf->condReady);
|
||||
uv_mutex_unlock(&udf->lock);
|
||||
} else {
|
||||
while (udf->state != UDF_STATE_READY) {
|
||||
uv_cond_wait(&udf->condReady, &udf->lock);
|
||||
}
|
||||
uv_mutex_unlock(&udf->lock);
|
||||
}
|
||||
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
|
||||
handle->udf = udf;
|
||||
|
||||
SUdfResponse rsp;
|
||||
rsp.seqNum = request->seqNum;
|
||||
rsp.type = request->type;
|
||||
rsp.code = code;
|
||||
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||
rsp.setupRsp.outputType = udf->outputType;
|
||||
rsp.setupRsp.outputLen = udf->outputLen;
|
||||
rsp.setupRsp.bufSize = udf->bufSize;
|
||||
|
||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||
rsp.msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, &rsp);
|
||||
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
return;
|
||||
}
|
||||
|
||||
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
SUdfCallRequest *call = &request->call;
|
||||
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType,
|
||||
call->udfHandle);
|
||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
||||
SUdf *udf = handle->udf;
|
||||
SUdfResponse response = {0};
|
||||
SUdfResponse *rsp = &response;
|
||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch(call->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||
SUdfColumn output = {0};
|
||||
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
code = udf->scalarProcFunc(&input, &output);
|
||||
|
||||
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
||||
freeUdfColumn(&output);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_INIT: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||
.bufLen= udf->bufSize,
|
||||
.numOfResult = 0};
|
||||
udf->aggStartFunc(&outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_PROC: {
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||
.bufLen= udf->bufSize,
|
||||
.numOfResult = 0};
|
||||
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_FIN: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||
.bufLen= udf->bufSize,
|
||||
.numOfResult = 0};
|
||||
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
rsp->seqNum = request->seqNum;
|
||||
rsp->type = request->type;
|
||||
rsp->code = code;
|
||||
subRsp->callType = call->callType;
|
||||
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
return;
|
||||
}
|
||||
|
||||
void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
||||
SUdfTeardownRequest *teardown = &request->teardown;
|
||||
fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
||||
SUdf *udf = handle->udf;
|
||||
bool unloadUdf = false;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
uv_mutex_lock(&global.udfsMutex);
|
||||
udf->refCount--;
|
||||
if (udf->refCount == 0) {
|
||||
unloadUdf = true;
|
||||
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
|
||||
}
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
if (unloadUdf) {
|
||||
uv_cond_destroy(&udf->condReady);
|
||||
uv_mutex_destroy(&udf->lock);
|
||||
if (udf->destroyFunc) {
|
||||
(udf->destroyFunc)();
|
||||
}
|
||||
uv_dlclose(&udf->lib);
|
||||
taosMemoryFree(udf);
|
||||
}
|
||||
taosMemoryFree(handle);
|
||||
|
||||
SUdfResponse response;
|
||||
SUdfResponse *rsp = &response;
|
||||
rsp->seqNum = request->seqNum;
|
||||
rsp->type = request->type;
|
||||
rsp->code = code;
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
return;
|
||||
}
|
||||
|
||||
void udfdProcessRequest(uv_work_t *req) {
|
||||
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
|
||||
SUdfRequest request = {0};
|
||||
|
@ -147,172 +323,16 @@ void udfdProcessRequest(uv_work_t *req) {
|
|||
|
||||
switch (request.type) {
|
||||
case UDF_TASK_SETUP: {
|
||||
// TODO: tracable id from client. connect, setup, call, teardown
|
||||
fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName);
|
||||
SUdfSetupRequest *setup = &request.setup;
|
||||
|
||||
SUdf *udf = NULL;
|
||||
uv_mutex_lock(&global.udfsMutex);
|
||||
SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName));
|
||||
if (udfInHash) {
|
||||
++(*udfInHash)->refCount;
|
||||
udf = *udfInHash;
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
} else {
|
||||
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
||||
udfNew->refCount = 1;
|
||||
udfNew->state = UDF_STATE_INIT;
|
||||
|
||||
uv_mutex_init(&udfNew->lock);
|
||||
uv_cond_init(&udfNew->condReady);
|
||||
udf = udfNew;
|
||||
taosHashPut(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName), &udfNew, sizeof(&udfNew));
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
}
|
||||
|
||||
uv_mutex_lock(&udf->lock);
|
||||
if (udf->state == UDF_STATE_INIT) {
|
||||
udf->state = UDF_STATE_LOADING;
|
||||
udfdLoadUdf(setup->udfName, udf);
|
||||
if (udf->initFunc) {
|
||||
udf->initFunc();
|
||||
}
|
||||
udf->state = UDF_STATE_READY;
|
||||
uv_cond_broadcast(&udf->condReady);
|
||||
uv_mutex_unlock(&udf->lock);
|
||||
} else {
|
||||
while (udf->state != UDF_STATE_READY) {
|
||||
uv_cond_wait(&udf->condReady, &udf->lock);
|
||||
}
|
||||
uv_mutex_unlock(&udf->lock);
|
||||
}
|
||||
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
|
||||
handle->udf = udf;
|
||||
SUdfResponse rsp;
|
||||
rsp.seqNum = request.seqNum;
|
||||
rsp.type = request.type;
|
||||
rsp.code = 0;
|
||||
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||
rsp.setupRsp.outputType = udf->outputType;
|
||||
rsp.setupRsp.outputLen = udf->outputLen;
|
||||
rsp.setupRsp.bufSize = udf->bufSize;
|
||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||
rsp.msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, &rsp);
|
||||
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
udfdProcessSetupRequest(uvUdf, &request);
|
||||
break;
|
||||
}
|
||||
|
||||
case UDF_TASK_CALL: {
|
||||
SUdfCallRequest *call = &request.call;
|
||||
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType,
|
||||
call->udfHandle);
|
||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
||||
SUdf *udf = handle->udf;
|
||||
SUdfResponse response = {0};
|
||||
SUdfResponse *rsp = &response;
|
||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||
|
||||
switch(call->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||
SUdfColumn output = {0};
|
||||
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
udf->scalarProcFunc(&input, &output);
|
||||
|
||||
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
||||
freeUdfColumn(&output);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_INIT: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||
.bufLen= udf->bufSize,
|
||||
.numOfResult = 0};
|
||||
udf->aggStartFunc(&outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_PROC: {
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||
.bufLen= udf->bufSize,
|
||||
.numOfResult = 0};
|
||||
udf->aggProcFunc(&input, &call->interBuf, &outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_FIN: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||
.bufLen= udf->bufSize,
|
||||
.numOfResult = 0};
|
||||
udf->aggFinishFunc(&call->interBuf, &outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
rsp->seqNum = request.seqNum;
|
||||
rsp->type = request.type;
|
||||
rsp->code = 0;
|
||||
subRsp->callType = call->callType;
|
||||
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
udfdProcessCallRequest(uvUdf, &request);
|
||||
break;
|
||||
}
|
||||
case UDF_TASK_TEARDOWN: {
|
||||
SUdfTeardownRequest *teardown = &request.teardown;
|
||||
fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle =
|
||||
(SUdfcFuncHandle *)(teardown->udfHandle);
|
||||
SUdf *udf = handle->udf;
|
||||
bool unloadUdf = false;
|
||||
uv_mutex_lock(&global.udfsMutex);
|
||||
udf->refCount--;
|
||||
if (udf->refCount == 0) {
|
||||
unloadUdf = true;
|
||||
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
|
||||
}
|
||||
uv_mutex_unlock(&global.udfsMutex);
|
||||
if (unloadUdf) {
|
||||
uv_cond_destroy(&udf->condReady);
|
||||
uv_mutex_destroy(&udf->lock);
|
||||
if (udf->destroyFunc) {
|
||||
(udf->destroyFunc)();
|
||||
}
|
||||
uv_dlclose(&udf->lib);
|
||||
taosMemoryFree(udf);
|
||||
}
|
||||
taosMemoryFree(handle);
|
||||
|
||||
SUdfResponse response;
|
||||
SUdfResponse *rsp = &response;
|
||||
rsp->seqNum = request.seqNum;
|
||||
rsp->type = request.type;
|
||||
rsp->code = 0;
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
void *buf = bufBegin;
|
||||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
udfdProcessTeardownRequest(uvUdf, &request);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
|
|
|
@ -27,6 +27,12 @@ int32_t udf2_start(SUdfInterBuf *buf) {
|
|||
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
||||
int64_t sumSquares = *(int64_t*)interBuf->buf;
|
||||
int8_t numOutput = 0;
|
||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||
SUdfColumn* col = block->udfCols[i];
|
||||
if (col->colMeta.type != TSDB_DATA_TYPE_INT) {
|
||||
return TSDB_CODE_UDF_INVALID_INPUT;
|
||||
}
|
||||
}
|
||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||
for (int32_t j = 0; j < block->numOfRows; ++j) {
|
||||
SUdfColumn* col = block->udfCols[i];
|
||||
|
|
|
@ -153,11 +153,6 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
|
|||
.handle = pInfo->msgInfo.handle,
|
||||
.persistHandle = persistHandle,
|
||||
.code = 0};
|
||||
if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH ||
|
||||
pInfo->msgType == TDMT_VND_QUERY_CONTINUE) {
|
||||
rpcMsg.persistHandle = 1;
|
||||
}
|
||||
|
||||
assert(pInfo->fp != NULL);
|
||||
|
||||
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
|
||||
|
@ -168,7 +163,7 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
|
|||
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
|
||||
}
|
||||
|
||||
char *jobTaskStatusStr(int32_t status) {
|
||||
char* jobTaskStatusStr(int32_t status) {
|
||||
switch (status) {
|
||||
case JOB_TASK_STATUS_NULL:
|
||||
return "NULL";
|
||||
|
@ -197,13 +192,10 @@ char *jobTaskStatusStr(int32_t status) {
|
|||
|
||||
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
|
||||
SSchema s = {0};
|
||||
s.type = type;
|
||||
s.type = type;
|
||||
s.bytes = bytes;
|
||||
s.colId = colId;
|
||||
|
||||
tstrncpy(s.name, name, tListLen(s.name));
|
||||
return s;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -461,6 +461,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect erro
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input")
|
||||
|
||||
//schemaless
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
|
||||
|
|
|
@ -93,104 +93,124 @@ class TDTestCase:
|
|||
res = tdSql.query(query_sql.replace('*', 'last(*)'), True)
|
||||
return int(res[0][-4])
|
||||
|
||||
def queryTsCol(self, tb_name):
|
||||
def queryTsCol(self, tb_name, check_elm=None):
|
||||
select_elm = "*" if check_elm is None else check_elm
|
||||
# ts and ts
|
||||
query_sql = f'select * from {tb_name} where ts > "2021-01-11 12:00:00" or ts < "2021-01-13 12:00:00"'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-11 12:00:00" or ts < "2021-01-13 12:00:00"'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(11)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" and ts <= "2021-01-13 12:00:00"'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" and ts <= "2021-01-13 12:00:00"'
|
||||
tdSql.query(query_sql)
|
||||
# tdSql.checkRows(2)
|
||||
# tdSql.checkEqual(self.queryLastC10(query_sql), 6)
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 6) if select_elm == "*" else False
|
||||
|
||||
## ts or and tinyint col
|
||||
query_sql = f'select * from {tb_name} where ts > "2021-01-11 12:00:00" or c1 = 2'
|
||||
tdSql.error(query_sql)
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-11 12:00:00" or c1 = 2'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(7)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts <= "2021-01-11 12:00:00" and c1 != 2'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts <= "2021-01-11 12:00:00" and c1 != 2'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(4)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 5)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 5) if select_elm == "*" else False
|
||||
|
||||
## ts or and smallint col
|
||||
query_sql = f'select * from {tb_name} where ts <> "2021-01-11 12:00:00" or c2 = 10'
|
||||
tdSql.error(query_sql)
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts <> "2021-01-11 12:00:00" or c2 = 10'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(10)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts <= "2021-01-11 12:00:00" and c2 <= 1'
|
||||
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts <= "2021-01-11 12:00:00" and c2 <= 1'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 1)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 1) if select_elm == "*" else False
|
||||
|
||||
## ts or and int col
|
||||
query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" or c3 = 4'
|
||||
tdSql.error(query_sql)
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" or c3 = 4'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" and c3 = 4'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" and c3 = 4'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 4)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 4) if select_elm == "*" else False
|
||||
|
||||
## ts or and big col
|
||||
query_sql = f'select * from {tb_name} where ts is Null or c4 = 5'
|
||||
tdSql.error(query_sql)
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts is not Null and c4 = 2'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts is Null or c4 = 5'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 3)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 5) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts is not Null and c4 = 2'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 3) if select_elm == "*" else False
|
||||
|
||||
## ts or and float col
|
||||
query_sql = f'select * from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c5 = 6.6'
|
||||
tdSql.error(query_sql)
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c5 = 6.6'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" and c5 = 1.1'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" and c5 = 1.1'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(4)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 4)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 4) if select_elm == "*" else False
|
||||
|
||||
## ts or and double col
|
||||
query_sql = f'select * from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c6 = 7.7'
|
||||
tdSql.error(query_sql)
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c6 = 7.7'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" and c6 = 1.1'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" and c6 = 1.1'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(4)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 4)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 4) if select_elm == "*" else False
|
||||
|
||||
## ts or and binary col
|
||||
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" or c7 like "binary_"'
|
||||
tdSql.error(query_sql)
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts <= "2021-01-11 12:00:00" and c7 in ("binary")'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" or c7 like "binary_"'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 5)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 8) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts <= "2021-01-11 12:00:00" and c7 in ("binary")'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 5) if select_elm == "*" else False
|
||||
|
||||
## ts or and nchar col
|
||||
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" or c8 like "nchar%"'
|
||||
tdSql.error(query_sql)
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" or c8 like "nchar%"'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(10)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 10) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" and c8 is Null'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" and c8 is Null'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
|
||||
|
||||
## ts or and bool col
|
||||
query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" or c9=false'
|
||||
tdSql.error(query_sql)
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" or c9=false'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
|
||||
|
||||
query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" and c9=true'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" and c9=true'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 9)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 9) if select_elm == "*" else False
|
||||
|
||||
## multi cols
|
||||
query_sql = f'select * from {tb_name} where ts > "2021-01-03 12:00:00" and c1 != 2 and c2 >= 2 and c3 <> 4 and c4 < 4 and c5 > 1 and c6 >= 1.1 and c7 is not Null and c8 = "nchar" and c9=false'
|
||||
query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-03 12:00:00" and c1 != 2 and c2 >= 2 and c3 <> 4 and c4 < 4 and c5 > 1 and c6 >= 1.1 and c7 is not Null and c8 = "nchar" and c9=false'
|
||||
tdSql.query(query_sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 10)
|
||||
tdSql.checkEqual(self.queryLastC10(query_sql), 10) if select_elm == "*" else False
|
||||
|
||||
def queryTsTag(self, tb_name):
|
||||
## ts and tinyint col
|
||||
|
@ -2029,12 +2049,12 @@ class TDTestCase:
|
|||
tb_name = self.initStb()
|
||||
self.queryFullTagType(tb_name)
|
||||
|
||||
def checkTbTsCol(self):
|
||||
def checkTbTsCol(self, check_elm):
|
||||
'''
|
||||
Ordinary table ts and col check
|
||||
'''
|
||||
tb_name = self.initTb()
|
||||
self.queryTsCol(tb_name)
|
||||
self.queryTsCol(tb_name, check_elm)
|
||||
|
||||
def checkStbTsTol(self):
|
||||
tb_name = self.initStb()
|
||||
|
@ -2112,8 +2132,8 @@ class TDTestCase:
|
|||
for check_elm in [None, column_name]:
|
||||
self.checkTbColTypeOperator(check_elm)
|
||||
self.checkStbColTypeOperator(check_elm)
|
||||
self.checkTbTsCol(check_elm)
|
||||
# self.checkStbTagTypeOperator()
|
||||
# self.checkTbTsCol()
|
||||
# self.checkStbTsTol()
|
||||
# self.checkStbTsTag()
|
||||
# self.checkStbTsColTag()
|
||||
|
|
|
@ -13,14 +13,12 @@ from util.dnodes import *
|
|||
|
||||
class TDTestCase:
|
||||
hostname = socket.gethostname()
|
||||
rpcDebugFlagVal = '143'
|
||||
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
|
||||
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
|
||||
print ("===================: ", updatecfgDict)
|
||||
#rpcDebugFlagVal = '143'
|
||||
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
#print ("===================: ", updatecfgDict)
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
|
@ -43,27 +41,35 @@ class TDTestCase:
|
|||
break
|
||||
return buildPath
|
||||
|
||||
def create_tables(self,dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
||||
tdSql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tdSql.execute("use %s" %dbName)
|
||||
tdSql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
def newcur(self,cfg,host,port):
|
||||
user = "root"
|
||||
password = "taosdata"
|
||||
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
|
||||
cur=con.cursor()
|
||||
print(cur)
|
||||
return cur
|
||||
|
||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("use %s" %dbName)
|
||||
tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
pre_create = "create table"
|
||||
sql = pre_create
|
||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
|
||||
if (i > 0) and (i%100 == 0):
|
||||
tdSql.execute(sql)
|
||||
tsql.execute(sql)
|
||||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tdSql.execute(sql)
|
||||
tsql.execute(sql)
|
||||
|
||||
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
|
||||
return
|
||||
|
||||
def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,startTs):
|
||||
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
|
||||
tdLog.debug("start to insert data ............")
|
||||
tdSql.execute("use %s" %dbName)
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_insert = "insert into "
|
||||
sql = pre_insert
|
||||
|
||||
|
@ -72,31 +78,38 @@ class TDTestCase:
|
|||
sql += " %s_%d values "%(stbName,i)
|
||||
for j in range(rowsPerTbl):
|
||||
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
||||
if (j > 0) and (j%2000 == 0):
|
||||
tdSql.execute(sql)
|
||||
sql = "insert into %s_%d values " %(stbName,i)
|
||||
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
|
||||
tsql.execute(sql)
|
||||
if j < rowsPerTbl - 1:
|
||||
sql = "insert into %s_%d values " %(stbName,i)
|
||||
else:
|
||||
sql = "insert into "
|
||||
#end sql
|
||||
if sql != pre_insert:
|
||||
# print(sql)
|
||||
print("sql:%s"%sql)
|
||||
tdSql.execute(sql)
|
||||
#print("insert sql:%s"%sql)
|
||||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
print ("input parameters:")
|
||||
print (parameterDict)
|
||||
self.create_tables(parameterDict["dbName"],\
|
||||
# create new connector for my thread
|
||||
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||
self.create_tables(tsql,\
|
||||
parameterDict["dbName"],\
|
||||
parameterDict["vgroups"],\
|
||||
parameterDict["stbName"],\
|
||||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"])
|
||||
|
||||
self.insert_data(parameterDict["dbName"],\
|
||||
parameterDict["stbName"],\
|
||||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["startTs"])
|
||||
self.insert_data(tsql,\
|
||||
parameterDict["dbName"],\
|
||||
parameterDict["stbName"],\
|
||||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"],\
|
||||
parameterDict["startTs"])
|
||||
return
|
||||
|
||||
def run(self):
|
||||
|
@ -113,17 +126,114 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix("======== test scenario 1: ")
|
||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||
# create and start thread
|
||||
parameterDict = {'dbName': 'db', \
|
||||
parameterDict = {'cfg': '', \
|
||||
'dbName': 'db', \
|
||||
'vgroups': 1, \
|
||||
'stbName': 'stb', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10, \
|
||||
'rowsPerTbl': 10000, \
|
||||
'batchNum': 10, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
time.sleep(2)
|
||||
|
||||
# wait stb ready
|
||||
while 1:
|
||||
#tdSql.query("show %s.stables"%parameterDict['dbName'])
|
||||
tdSql.query("show db.stables")
|
||||
#print (self.queryResult)
|
||||
#print (tdSql.getRows())
|
||||
if tdSql.getRows() == 1:
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
tdLog.info("create topics from super table")
|
||||
topicFromStb = 'topic_stb_column'
|
||||
topicFromCtb = 'topic_ctb_column'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
time.sleep(1)
|
||||
tdSql.query("show topics")
|
||||
print ("======================================")
|
||||
#print (self.queryResult)
|
||||
#tdSql.checkRows(2)
|
||||
topic1 = tdSql.getData(0 , 0)
|
||||
topic2 = tdSql.getData(1 , 0)
|
||||
print (topic1)
|
||||
print (topic2)
|
||||
|
||||
print (topicFromStb)
|
||||
print (topicFromCtb)
|
||||
#tdLog.info("show topics: %s, %s"%topic1, topic2)
|
||||
#if topic1 != topicFromStb or topic1 != topicFromCtb:
|
||||
# tdLog.exit("topic error1")
|
||||
#if topic2 != topicFromStb or topic2 != topicFromCtb:
|
||||
# tdLog.exit("topic error2")
|
||||
|
||||
tdLog.info("create consume info table and consume result table")
|
||||
cdbName = parameterDict["dbName"]
|
||||
#tdSql.query("create database %s"%cdbName)
|
||||
#tdSql.query("use %s"%cdbName)
|
||||
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
|
||||
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
|
||||
|
||||
consumerId = 0
|
||||
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
|
||||
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
|
||||
topicList = topicFromStb
|
||||
ifcheckdata = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
sql = "insert into consumeinfo values "
|
||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
|
||||
tdSql.query(sql)
|
||||
|
||||
tdLog.info("check stb if there are data")
|
||||
while 1:
|
||||
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
|
||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||
countOfStb = tdSql.getData(0, 0)
|
||||
if countOfStb != 0:
|
||||
tdLog.info("count from stb: %d"%countOfStb)
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 5
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread.join()
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
while 1:
|
||||
tdSql.query("select * from consumeresult")
|
||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||
if tdSql.getRows() == 1:
|
||||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
||||
tdSql.checkData(0 , 1, consumerId)
|
||||
tdSql.checkData(0 , 2, expectmsgcnt)
|
||||
tdSql.checkData(0 , 3, expectrowcnt)
|
||||
|
||||
tdLog.printNoPrefix("======== test scenario 2: ")
|
||||
|
||||
|
|
|
@ -51,3 +51,7 @@ python3 ./test.py -f 2-query/arcsin.py
|
|||
python3 ./test.py -f 2-query/arccos.py
|
||||
python3 ./test.py -f 2-query/arctan.py
|
||||
# python3 ./test.py -f 2-query/query_cols_tags_and_or.py
|
||||
|
||||
python3 ./test.py -f 7-tmq/basic5.py
|
||||
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue