Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/alter_table

This commit is contained in:
Hongze Cheng 2022-05-14 14:40:18 +00:00
commit da75e3d19f
41 changed files with 690 additions and 246 deletions

View File

@ -59,10 +59,11 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
* precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond.
*/
static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
int64_t factor = (precision == TSDB_TIME_PRECISION_MILLI) ? 1000 :
(precision == TSDB_TIME_PRECISION_MICRO) ? 1000000 : 1000000000;
time_t t = taosTime(NULL);
struct tm * tm= taosLocalTime(&t, NULL);
int64_t factor = (precision == TSDB_TIME_PRECISION_MILLI) ? 1000
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000
: 1000000000;
time_t t = taosTime(NULL);
struct tm* tm = taosLocalTime(&t, NULL);
tm->tm_hour = 0;
tm->tm_min = 0;
tm->tm_sec = 0;
@ -79,13 +80,13 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
void deltaToUtcInitOnce();
char getPrecisionUnit(int32_t precision);
char getPrecisionUnit(int32_t precision);
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit);
int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec, int64_t *timeVal);
int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal);
void taosFormatUtcTime(char *buf, int32_t bufLen, int64_t time, int32_t precision);
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t time, int32_t precision);
#ifdef __cplusplus
}

View File

@ -51,14 +51,12 @@ typedef struct STableComInfo {
} STableComInfo;
typedef struct SIndexMeta {
#ifdef WINDOWS
size_t avoidCompilationErrors;
#endif
} SIndexMeta;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
@ -95,7 +93,7 @@ typedef struct SDBVgInfo {
int32_t vgVersion;
int8_t hashMethod;
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
} SDBVgInfo;
typedef struct SUseDbOutput {
@ -135,7 +133,7 @@ typedef struct SMsgSendInfo {
} SMsgSendInfo;
typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
int32_t initTaskQueue();
@ -172,7 +170,7 @@ const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
char *jobTaskStatusStr(int32_t status);
char* jobTaskStatusStr(int32_t status);
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
@ -184,62 +182,87 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define REQUEST_MAX_TRY_TIMES 5
#define qFatal(...) \
do { \
if (qDebugFlag & DEBUG_FATAL) { \
taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \
} \
#define qFatal(...) \
do { \
if (qDebugFlag & DEBUG_FATAL) { \
taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qError(...) \
do { \
if (qDebugFlag & DEBUG_ERROR) { \
taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \
} \
#define qError(...) \
do { \
if (qDebugFlag & DEBUG_ERROR) { \
taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qWarn(...) \
do { \
if (qDebugFlag & DEBUG_WARN) { \
taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \
} \
#define qWarn(...) \
do { \
if (qDebugFlag & DEBUG_WARN) { \
taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qInfo(...) \
do { \
if (qDebugFlag & DEBUG_INFO) { \
taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \
} \
#define qInfo(...) \
do { \
if (qDebugFlag & DEBUG_INFO) { \
taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebug(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
#define qDebug(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qTrace(...) \
do { \
if (qDebugFlag & DEBUG_TRACE) { \
taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \
} \
#define qTrace(...) \
do { \
if (qDebugFlag & DEBUG_TRACE) { \
taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebugL(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
#define qDebugL(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QRY_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QRY_ERR_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
} \
} while (0)
#define QRY_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
return _code; \
} while (0)
#define QRY_ERR_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
#ifdef __cplusplus
}

View File

@ -636,6 +636,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x2644)
#define TSDB_CODE_PAR_INVALID_STREAM_QUERY TAOS_DEF_ERROR_CODE(0, 0x2645)
#define TSDB_CODE_PAR_INVALID_INTERNAL_PK TAOS_DEF_ERROR_CODE(0, 0x2646)
#define TSDB_CODE_PAR_INVALID_TIMELINE_FUNC TAOS_DEF_ERROR_CODE(0, 0x2647)
#define TSDB_CODE_PAR_INVALID_PASSWD TAOS_DEF_ERROR_CODE(0, 0x2648)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
@ -657,6 +659,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905)
#define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906)
#define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907)
#define TSDB_CODE_UDF_NO_FUNC_HANDLE TAOS_DEF_ERROR_CODE(0, 0x2908)
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)
#define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001)

View File

@ -67,7 +67,6 @@ bin_files="${compile_dir}/build/bin/taosd ${compile_dir}/build/bin/taos ${compi
cp -rf ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
cp ${compile_dir}/build/lib/libtaos.so ${install_dir}/lib/
cp ${compile_dir}/build/lib/libtdb.so ${install_dir}/lib/
cp ${compile_dir}/build/lib/libavro* ${install_dir}/lib/ > /dev/null || echo -e "failed to copy avro libraries"
cp -rf ${compile_dir}/build/lib/pkgconfig ${install_dir}/lib/ > /dev/null || echo -e "failed to copy pkgconfig directory"

View File

@ -215,15 +215,9 @@ function install_lib() {
${csudo} ln -s ${install_main_dir}/lib/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo} ln -s ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
${csudo} ln -s ${install_main_dir}/lib/libtdb.* ${lib_link_dir}/libtdb.so.1
${csudo} ln -s ${lib_link_dir}/libtdb.so.1 ${lib_link_dir}/libtdb.so
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then
${csudo} ln -s ${install_main_dir}/lib/libtaos.* ${lib64_link_dir}/libtaos.so.1 || :
${csudo} ln -s ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || :
${csudo} ln -s ${install_main_dir}/lib/libtdb.* ${lib64_link_dir}/libtdb.so.1 || :
${csudo} ln -s ${lib64_link_dir}/libtdb.so.1 ${lib64_link_dir}/libtdb.so || :
fi
${csudo} ldconfig

View File

@ -172,7 +172,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
.pUser = pTscObj->user};
.pUser = pTscObj->user,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
@ -947,8 +948,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
// TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows;
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
convertUcs4);
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
}
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {

View File

@ -2769,20 +2769,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} while (moveToNextRowInMem(pCheckInfo));
taosMemoryFreeClear(pSchema); // free the STSChema
assert(numOfRows <= maxRowsToRead);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && numOfRows < maxRowsToRead) {
int32_t emptySize = maxRowsToRead - numOfRows;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
}
}
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);

View File

@ -651,7 +651,7 @@ void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);

View File

@ -2115,7 +2115,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
}
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo) {
if (pFilterNode == NULL) {
return;
}
@ -2129,8 +2129,9 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL;
// todo the keep seems never to be True??
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
@ -2152,11 +2153,6 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
// For the reserved column, the value is not filled yet, so the whole column data may be NULL.
if (pSrc->pData == NULL) {
continue;
}
int32_t numOfRows = 0;
for (int32_t j = 0; j < totalRows; ++j) {
if (rowRes[j] == 0) {

View File

@ -318,7 +318,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
doFilter(pInfo->pCondition, pRes, NULL);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
if (!hasRemain) {

View File

@ -398,6 +398,10 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
output->status = SFLT_ACCURATE_INDEX;
}
if (ctx->noExec) {
SIF_RET(code);
}
return operFn(&params[0], nParam > 1 ? &params[1] : NULL, output);
_return:
taosMemoryFree(params);

View File

@ -159,6 +159,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return false;
}
static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock);
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -238,8 +240,15 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(pTableScanInfo, pBlock);
}
// todo record the filter time cost
doFilter(pTableScanInfo->pFilterNode, pBlock);
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
@ -260,7 +269,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
}
static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr == 0) {
return;
@ -330,11 +339,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue;
}
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(pTableScanInfo, pBlock);
}
return pBlock;
}
@ -750,7 +754,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return NULL;
}
rows = pBlockInfo->rows;
doFilter(pInfo->pCondition, pInfo->pRes);
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
break;
}

View File

@ -249,7 +249,7 @@ TEST(testCase, index_filter_varify) {
sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
SIdxFltStatus st = idxGetFltStatus(opNode);
EXPECT_EQ(st, SFLT_COARSE_INDEX);
EXPECT_EQ(st, SFLT_ACCURATE_INDEX);
nodesDestroyNode(res);
}
{
@ -269,7 +269,7 @@ TEST(testCase, index_filter_varify) {
sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
SIdxFltStatus st = idxGetFltStatus(opNode);
EXPECT_EQ(st, SFLT_COARSE_INDEX);
EXPECT_EQ(st, SFLT_ACCURATE_INDEX);
nodesDestroyNode(res);
}
}

View File

@ -1072,6 +1072,8 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
int32_t code = 0;
switch (uvTask->type) {
case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
@ -1091,22 +1093,34 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
connReq->data = uvTask;
uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
code = 0;
break;
}
case UV_TASK_REQ_RSP: {
uv_pipe_t *pipe = uvTask->pipe;
uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
write->data = uvTask;
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite);
if (err != 0) {
fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err));
if (pipe == NULL) {
code = TSDB_CODE_UDF_PIPE_NO_PIPE;
} else {
uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
write->data = uvTask;
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite);
if (err != 0) {
fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err));
}
code = err;
}
break;
}
case UV_TASK_DISCONNECT: {
SClientUvConn *conn = uvTask->pipe->data;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose);
uv_pipe_t *pipe = uvTask->pipe;
if (pipe == NULL) {
code = TSDB_CODE_UDF_PIPE_NO_PIPE;
} else {
SClientUvConn *conn = pipe->data;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
code = 0;
}
break;
}
default: {
@ -1115,7 +1129,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
}
}
return 0;
return code;
}
void udfClientAsyncCb(uv_async_t *async) {
@ -1133,6 +1147,9 @@ void udfClientAsyncCb(uv_async_t *async) {
int32_t code = udfcStartUvTask(task);
if (code == 0) {
QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
} else {
task->errCode = code;
uv_sem_post(&task->taskSem);
}
}
@ -1483,6 +1500,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SClientUdfUvSession *session = udfRes->session;
if (session == NULL) {
return TSDB_CODE_UDF_NO_FUNC_HANDLE;
}
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
@ -1535,6 +1555,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SClientUdfUvSession *session = udfRes->session;
if (session == NULL) {
return TSDB_CODE_UDF_NO_FUNC_HANDLE;
}
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;

View File

@ -37,6 +37,8 @@ TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b);
_cache_range_compare indexGetCompare(RangeType ty);
int32_t indexConvertData(void* src, int8_t type, void** dst);
#ifdef __cplusplus
}
#endif

View File

@ -46,9 +46,7 @@ typedef struct SIndexStat {
} SIndexStat;
struct SIndex {
#ifdef USE_LUCENE
index_t* index;
#endif
int64_t refId;
void* cache;
void* tindex;
SHashObj* colObj; // < field name, field id>
@ -124,6 +122,11 @@ typedef struct TFileCacheKey {
int indexFlushCacheToTFile(SIndex* sIdx, void*);
int64_t indexAddRef(void* p);
int32_t indexRemoveRef(int64_t ref);
void indexAcquireRef(int64_t ref);
void indexReleaseRef(int64_t ref);
int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);

View File

@ -19,7 +19,10 @@
#include "indexInt.h"
#include "indexTfile.h"
#include "indexUtil.h"
#include "tcoding.h"
#include "tdataformat.h"
#include "tdef.h"
#include "tref.h"
#include "tsched.h"
#ifdef USE_LUCENE
@ -27,36 +30,40 @@
#endif
#define INDEX_NUM_OF_THREADS 4
#define INDEX_QUEUE_SIZE 200
#define INDEX_QUEUE_SIZE 200
void* indexQhandle = NULL;
#define INDEX_DATA_BOOL_NULL 0x02
#define INDEX_DATA_TINYINT_NULL 0x80
#define INDEX_DATA_SMALLINT_NULL 0x8000
#define INDEX_DATA_INT_NULL 0x80000000L
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L
#define INDEX_DATA_BOOL_NULL 0x02
#define INDEX_DATA_TINYINT_NULL 0x80
#define INDEX_DATA_SMALLINT_NULL 0x8000
#define INDEX_DATA_INT_NULL 0x80000000L
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL 0xFF
#define INDEX_DATA_JSON_NULL 0xFFFFFFFF
#define INDEX_DATA_JSON_null 0xFFFFFFFE
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL 0xFF
#define INDEX_DATA_JSON_NULL 0xFFFFFFFF
#define INDEX_DATA_JSON_null 0xFFFFFFFE
#define INDEX_DATA_JSON_NOT_NULL 0x01
#define INDEX_DATA_UTINYINT_NULL 0xFF
#define INDEX_DATA_UTINYINT_NULL 0xFF
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
#define INDEX_DATA_UINT_NULL 0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
#define INDEX_DATA_UINT_NULL 0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
#define INDEX_DATA_NULL_STR "NULL"
#define INDEX_DATA_NULL_STR "NULL"
#define INDEX_DATA_NULL_STR_L "null"
void* indexQhandle = NULL;
int32_t indexRefMgt;
static void indexDestroy(void* sIdx);
void indexInit() {
// refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
indexRefMgt = taosOpenRef(10, indexDestroy);
}
void indexCleanUp() {
// refacto later
@ -100,7 +107,12 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
sIdx->cVersion = 1;
sIdx->path = tstrdup(path);
taosThreadMutexInit(&sIdx->mtx, NULL);
sIdx->refId = indexAddRef(sIdx);
taosAcquireRef(indexRefMgt, sIdx->refId);
*index = sIdx;
return 0;
END:
@ -112,8 +124,9 @@ END:
return -1;
}
void indexClose(SIndex* sIdx) {
void* iter = taosHashIterate(sIdx->colObj, NULL);
void indexDestroy(void* handle) {
SIndex* sIdx = handle;
void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) {
IndexCache** pCache = iter;
if (*pCache) {
@ -128,6 +141,27 @@ void indexClose(SIndex* sIdx) {
taosMemoryFree(sIdx);
return;
}
void indexClose(SIndex* sIdx) {
indexReleaseRef(sIdx->refId);
indexRemoveRef(sIdx->refId);
}
int64_t indexAddRef(void* p) {
// impl
return taosAddRef(indexRefMgt, p);
}
int32_t indexRemoveRef(int64_t ref) {
// impl later
return taosRemoveRef(indexRefMgt, ref);
}
void indexAcquireRef(int64_t ref) {
// impl
taosAcquireRef(indexRefMgt, ref);
}
void indexReleaseRef(int64_t ref) {
// impl
taosReleaseRef(indexRefMgt, ref);
}
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
// TODO(yihao): reduce the lock range
@ -222,6 +256,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
tm->operType = oper;
tm->colType = colType;
#if 0
tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
memcpy(tm->colName, colName, nColName);
tm->nColName = nColName;
@ -229,6 +264,22 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
memcpy(tm->colVal, colVal, nColVal);
tm->nColVal = nColVal;
#endif
#if 1
tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
memcpy(tm->colName, colName, nColName);
tm->nColName = nColName;
char* buf = NULL;
int32_t len = indexConvertData((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf);
assert(len != -1);
tm->colVal = buf;
tm->nColVal = len;
#endif
return tm;
}
@ -457,6 +508,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
} else {
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
}
indexReleaseRef(sIdx->refId);
return ret;
}
void iterateValueDestroy(IterateValue* value, bool destroy) {

View File

@ -460,8 +460,11 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
schedMsg.fp = doMergeWork;
schedMsg.ahandle = pCache;
schedMsg.thandle = NULL;
// schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t));
// memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t));
schedMsg.msg = NULL;
indexAcquireRef(pCache->index->refId);
taosScheduleTask(indexQhandle, &schedMsg);
return 0;

View File

@ -16,25 +16,33 @@
#include "indexComm.h"
#include "index.h"
#include "indexInt.h"
#include "tcoding.h"
#include "tcompare.h"
#include "tdataformat.h"
char JSON_COLUMN[] = "JSON";
char JSON_VALUE_DELIM = '&';
static __compar_fn_t indexGetCompar(int8_t type) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
return (__compar_fn_t)strcmp;
}
return getComparFunc(type, 0);
}
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
__compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
}
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
__compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
}
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
__compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
}
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
__compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
}
@ -120,3 +128,101 @@ char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
return buf;
}
int32_t indexConvertData(void* src, int8_t type, void** dst) {
int tlen = -1;
switch (type) {
case TSDB_DATA_TYPE_TIMESTAMP:
tlen = taosEncodeFixedI64(NULL, *(int64_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedI64(dst, *(int64_t*)src);
break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
tlen = taosEncodeFixedU8(NULL, *(uint8_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedU8(dst, *(uint8_t*)src);
break;
case TSDB_DATA_TYPE_TINYINT:
tlen = taosEncodeFixedI8(NULL, *(uint8_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedI8(dst, *(uint8_t*)src);
break;
case TSDB_DATA_TYPE_SMALLINT:
tlen = taosEncodeFixedI16(NULL, *(int16_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedI16(dst, *(int16_t*)src);
break;
case TSDB_DATA_TYPE_USMALLINT:
tlen = taosEncodeFixedU16(NULL, *(uint16_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedU16(dst, *(uint16_t*)src);
break;
case TSDB_DATA_TYPE_INT:
tlen = taosEncodeFixedI32(NULL, *(int32_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedI32(dst, *(int32_t*)src);
break;
case TSDB_DATA_TYPE_FLOAT:
tlen = taosEncodeBinary(NULL, src, sizeof(float));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, sizeof(float));
break;
case TSDB_DATA_TYPE_UINT:
tlen = taosEncodeFixedU32(NULL, *(uint32_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedU32(dst, *(uint32_t*)src);
break;
case TSDB_DATA_TYPE_BIGINT:
tlen = taosEncodeFixedI64(NULL, *(uint32_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedI64(dst, *(uint32_t*)src);
break;
case TSDB_DATA_TYPE_DOUBLE:
tlen = taosEncodeBinary(NULL, src, sizeof(double));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, sizeof(double));
break;
case TSDB_DATA_TYPE_UBIGINT:
tlen = taosEncodeFixedU64(NULL, *(uint32_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedU64(dst, *(uint32_t*)src);
break;
case TSDB_DATA_TYPE_NCHAR: {
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
break;
}
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
#if 1
tlen = taosEncodeBinary(NULL, src, strlen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, strlen(src));
break;
#endif
}
case TSDB_DATA_TYPE_VARBINARY:
#if 1
tlen = taosEncodeBinary(NULL, src, strlen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, strlen(src));
break;
#endif
default:
TASSERT(0);
break;
}
*dst = *dst - tlen;
if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR && type != TSDB_DATA_TYPE_VARBINARY &&
type == TSDB_DATA_TYPE_VARCHAR) {
uint8_t* p = *dst;
for (int i = 0; i < tlen; i++) {
if (p[i] == 0) {
p[i] = (uint8_t)'0';
}
}
}
return tlen;
}

View File

@ -82,7 +82,10 @@ FstSlice fstSliceCreate(uint8_t* data, uint64_t len) {
str->ref = 1;
str->len = len;
str->data = taosMemoryMalloc(len * sizeof(uint8_t));
memcpy(str->data, data, len);
if (data != NULL) {
memcpy(str->data, data, len);
}
FstSlice s = {.str = str, .start = 0, .end = len - 1};
return s;

View File

@ -469,13 +469,19 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
FstSlice* s = &rt->data;
char* ch = (char*)fstSliceData(s, NULL);
if (0 != strncmp(ch, p, skip)) {
int32_t sz = 0;
char* ch = (char*)fstSliceData(s, &sz);
char* tmp = taosMemoryCalloc(1, sz + 1);
memcpy(tmp, ch, sz);
if (0 != strncmp(tmp, p, skip)) {
swsResultDestroy(rt);
taosMemoryFree(tmp);
break;
}
TExeCond cond = cmpFn(ch + skip, tem->colVal, tem->colType);
TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) {
@ -483,6 +489,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
swsResultDestroy(rt);
break;
}
taosMemoryFree(tmp);
swsResultDestroy(rt);
}
streamWithStateDestroy(st);

View File

@ -17,12 +17,32 @@
#include "tutil.h"
static std::string dir = "/tmp/json";
static std::string logDir = "/tmp/log";
static void initLog() {
const char* defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
sDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
class JsonEnv : public ::testing::Test {
protected:
virtual void SetUp() {
taosRemoveDir(logDir.c_str());
taosMkDir(logDir.c_str());
taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str());
printf("set up\n");
initLog();
opts = indexOptsCreate();
int ret = tIndexJsonOpen(opts, dir.c_str(), &index);
assert(ret == 0);

View File

@ -318,15 +318,19 @@ static int32_t jsonToTableComInfo(const SJson* pJson, void* pObj) {
STableComInfo* pNode = (STableComInfo*)pObj;
int32_t code;
tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags, code);;
tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags, code);
;
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision, code);;
tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns, code);;
tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize, code);;
tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize, code);
;
}
return code;
@ -358,12 +362,15 @@ static int32_t jsonToSchema(const SJson* pJson, void* pObj) {
SSchema* pNode = (SSchema*)pObj;
int32_t code;
tjsonGetNumberValue(pJson, jkSchemaType, pNode->type, code);;
tjsonGetNumberValue(pJson, jkSchemaType, pNode->type, code);
;
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId, code);;
tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes, code);;
tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name);
@ -415,21 +422,27 @@ static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) {
STableMeta* pNode = (STableMeta*)pObj;
int32_t code;
tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId, code);;
tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId, code);
;
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType, code);;
tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid, code);;
tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid, code);;
tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion, code);;
tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion, code);;
tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkTableMetaComInfo, jsonToTableComInfo, &pNode->tableInfo);
@ -605,7 +618,8 @@ static int32_t jsonToLogicFillNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkFillLogicPlanMode, pNode->mode, code);;
tjsonGetNumberValue(pJson, jkFillLogicPlanMode, pNode->mode, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkFillLogicPlanWStartTs, &pNode->pWStartTs);
@ -881,7 +895,8 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) {
code = jsonToNodeObject(pJson, jkLogicSubplanRootNode, (SNode**)&pNode->pNode);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code);;
tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code);
;
}
int32_t objSize = 0;
if (TSDB_CODE_SUCCESS == code) {
@ -1121,33 +1136,43 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanRatio, &pNode->ratio);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);;
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);;
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);;
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);;
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);;
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);;
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);
;
}
return code;
}
static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiTableScanNodeToJson(pObj, pJson); }
static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) {
return physiTableScanNodeToJson(pObj, pJson);
}
static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiTableScanNode(pJson, pObj); }
static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) {
return jsonToPhysiTableScanNode(pJson, pObj);
}
static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet";
static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite";
@ -1181,7 +1206,8 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
code = tjsonGetBoolValue(pJson, jkSysTableScanPhysiPlanShowRewrite, &pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId, code);;
tjsonGetNumberValue(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId, code);
;
}
return code;
@ -1265,7 +1291,8 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);;
tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions);
@ -1427,10 +1454,12 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType, code);;
tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark, code);;
tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark, code);
;
}
return code;
@ -1526,7 +1555,8 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode, code);;
tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkFillPhysiPlanWStartTs, &pNode->pWStartTs);
@ -1565,7 +1595,8 @@ static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkSessionWindowPhysiPlanGap, pNode->gap, code);;
tjsonGetNumberValue(pJson, jkSessionWindowPhysiPlanGap, pNode->gap, code);
;
}
return code;
@ -1727,7 +1758,8 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
int32_t code = tjsonToObject(pJson, jkSubplanId, jsonToSubplanId, &pNode->id);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkSubplanType, pNode->subplanType, code);;
tjsonGetNumberValue(pJson, jkSubplanType, pNode->subplanType, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanMsgType, &pNode->msgType);
@ -1917,7 +1949,8 @@ static int32_t jsonToColumnNode(const SJson* pJson, void* pObj) {
code = tjsonGetSmallIntValue(pJson, jkColumnColId, &pNode->colId);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkColumnColType, pNode->colType, code);;
tjsonGetNumberValue(pJson, jkColumnColType, pNode->colType, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkColumnDbName, pNode->dbName);
@ -2171,7 +2204,8 @@ static int32_t jsonToOperatorNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToExprNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkOperatorType, pNode->opType, code);;
tjsonGetNumberValue(pJson, jkOperatorType, pNode->opType, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkOperatorLeft, &pNode->pLeft);
@ -2205,7 +2239,8 @@ static int32_t jsonToLogicConditionNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToExprNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkLogicCondType, pNode->condType, code);;
tjsonGetNumberValue(pJson, jkLogicCondType, pNode->condType, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkLogicCondParameters, &pNode->pParameterList);
@ -2350,6 +2385,30 @@ static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkTempTableSubquery = "Subquery";
static int32_t tempTableNodeToJson(const void* pObj, SJson* pJson) {
const STempTableNode* pNode = (const STempTableNode*)pObj;
int32_t code = tableNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkTempTableSubquery, nodeToJson, pNode->pSubquery);
}
return code;
}
static int32_t jsonToTempTableNode(const SJson* pJson, void* pObj) {
STempTableNode* pNode = (STempTableNode*)pObj;
int32_t code = jsonToTableNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkTempTableSubquery, &pNode->pSubquery);
}
return code;
}
static const char* jkGroupingSetType = "GroupingSetType";
static const char* jkGroupingSetParameter = "Parameters";
@ -2387,10 +2446,12 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToNodeObject(pJson, jkOrderByExprExpr, &pNode->pExpr);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order, code);;
tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder, code);;
tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder, code);
;
}
return code;
@ -2497,7 +2558,8 @@ static int32_t jsonToFillNode(const SJson* pJson, void* pObj) {
SFillNode* pNode = (SFillNode*)pObj;
int32_t code;
tjsonGetNumberValue(pJson, jkFillMode, pNode->mode, code);;
tjsonGetNumberValue(pJson, jkFillMode, pNode->mode, code);
;
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkFillValues, &pNode->pValues);
}
@ -2663,6 +2725,60 @@ static int32_t jsonToDataBlockDescNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkSetOperatorOpType = "OpType";
static const char* jkSetOperatorProjections = "Projections";
static const char* jkSetOperatorLeft = "Left";
static const char* jkSetOperatorRight = "Right";
static const char* jkSetOperatorOrderByList = "OrderByList";
static const char* jkSetOperatorLimit = "Limit";
static int32_t setOperatorToJson(const void* pObj, SJson* pJson) {
const SSetOperator* pNode = (const SSetOperator*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkSetOperatorOpType, pNode->opType);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSetOperatorProjections, pNode->pProjectionList);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSetOperatorLeft, nodeToJson, pNode->pLeft);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSetOperatorRight, nodeToJson, pNode->pRight);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSetOperatorOrderByList, pNode->pOrderByList);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSetOperatorLimit, nodeToJson, pNode->pLimit);
}
return code;
}
static int32_t jsonToSetOperator(const SJson* pJson, void* pObj) {
SSetOperator* pNode = (SSetOperator*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
tjsonGetNumberValue(pJson, jkSetOperatorOpType, pNode->opType, code);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSetOperatorProjections, &pNode->pProjectionList);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSetOperatorLeft, &pNode->pLeft);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSetOperatorRight, &pNode->pRight);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSetOperatorOrderByList, &pNode->pOrderByList);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSetOperatorLimit, &pNode->pLimit);
}
return code;
}
static const char* jkSelectStmtDistinct = "Distinct";
static const char* jkSelectStmtProjections = "Projections";
static const char* jkSelectStmtFrom = "From";
@ -2677,7 +2793,7 @@ static const char* jkSelectStmtSlimit = "Slimit";
static const char* jkSelectStmtStmtName = "StmtName";
static const char* jkSelectStmtHasAggFuncs = "HasAggFuncs";
static int32_t selectStmtTojson(const void* pObj, SJson* pJson) {
static int32_t selectStmtToJson(const void* pObj, SJson* pJson) {
const SSelectStmt* pNode = (const SSelectStmt*)pObj;
int32_t code = tjsonAddBoolToObject(pJson, jkSelectStmtDistinct, pNode->isDistinct);
@ -2819,6 +2935,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_REAL_TABLE:
return realTableNodeToJson(pObj, pJson);
case QUERY_NODE_TEMP_TABLE:
return tempTableNodeToJson(pObj, pJson);
case QUERY_NODE_JOIN_TABLE:
break;
case QUERY_NODE_GROUPING_SET:
@ -2848,9 +2965,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_DOWNSTREAM_SOURCE:
return downstreamSourceNodeToJson(pObj, pJson);
case QUERY_NODE_SET_OPERATOR:
break;
return setOperatorToJson(pObj, pJson);
case QUERY_NODE_SELECT_STMT:
return selectStmtTojson(pObj, pJson);
return selectStmtToJson(pObj, pJson);
case QUERY_NODE_VNODE_MODIF_STMT:
case QUERY_NODE_CREATE_DATABASE_STMT:
case QUERY_NODE_CREATE_TABLE_STMT:
@ -2918,7 +3035,6 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN:
return planToJson(pObj, pJson);
default:
// assert(0);
break;
}
nodesWarn("specificNodeToJson unknown node = %s", nodesNodeName(nodeType(pObj)));
@ -2939,6 +3055,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToFunctionNode(pJson, pObj);
case QUERY_NODE_REAL_TABLE:
return jsonToRealTableNode(pJson, pObj);
case QUERY_NODE_TEMP_TABLE:
return jsonToTempTableNode(pJson, pObj);
case QUERY_NODE_ORDER_BY_EXPR:
return jsonToOrderByExprNode(pJson, pObj);
case QUERY_NODE_INTERVAL_WINDOW:
@ -2955,6 +3073,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToSlotDescNode(pJson, pObj);
case QUERY_NODE_DOWNSTREAM_SOURCE:
return jsonToDownstreamSourceNode(pJson, pObj);
case QUERY_NODE_SET_OPERATOR:
return jsonToSetOperator(pJson, pObj);
case QUERY_NODE_SELECT_STMT:
return jsonToSelectStmt(pJson, pObj);
case QUERY_NODE_CREATE_TOPIC_STMT:
@ -3007,7 +3127,6 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN:
return jsonToPlan(pJson, pObj);
default:
assert(0);
break;
}
nodesWarn("jsonToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj)));
@ -3038,7 +3157,8 @@ static int32_t jsonToNode(const SJson* pJson, void* pObj) {
SNode* pNode = (SNode*)pObj;
int32_t code;
tjsonGetNumberValue(pJson, jkNodeType, pNode->type, code);;
tjsonGetNumberValue(pJson, jkNodeType, pNode->type, code);
;
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, nodesNodeName(pNode->type), jsonToSpecificNode, pNode);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -14,6 +14,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <regex.h>
#include "parAst.h"
#include "parUtil.h"
#include "ttime.h"
@ -76,6 +78,19 @@ static bool checkUserName(SAstCreateContext* pCxt, SToken* pUserName) {
return TSDB_CODE_SUCCESS == pCxt->errCode;
}
static bool invalidPassword(const char* pPassword) {
regex_t regex;
if (regcomp(&regex, "[ '\"`\\]", REG_EXTENDED | REG_ICASE) != 0) {
return false;
}
/* Execute regular expression */
int32_t res = regexec(&regex, pPassword, 0, NULL, 0);
regfree(&regex);
return 0 == res;
}
static bool checkPassword(SAstCreateContext* pCxt, const SToken* pPasswordToken, char* pPassword) {
if (NULL == pPasswordToken) {
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
@ -86,6 +101,8 @@ static bool checkPassword(SAstCreateContext* pCxt, const SToken* pPasswordToken,
strdequote(pPassword);
if (strtrim(pPassword) <= 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_PASSWD_EMPTY);
} else if (invalidPassword(pPassword)) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_PASSWD);
}
}
return TSDB_CODE_SUCCESS == pCxt->errCode;

View File

@ -65,13 +65,19 @@ static int32_t authSetOperator(SAuthCxt* pCxt, SSetOperator* pSetOper) {
return code;
}
static int32_t authDropUser(SAuthCxt* pCxt, SDropUserReq* pStmt) {
if (!pCxt->pParseCxt->isSuperUser || 0 == strcmp(pStmt->user, TSDB_DEFAULT_USER)) {
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
return TSDB_CODE_SUCCESS;
}
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
switch (nodeType(pStmt)) {
case QUERY_NODE_SET_OPERATOR:
return authSetOperator(pCxt, (SSetOperator*)pStmt);
case QUERY_NODE_SELECT_STMT:
return authSelect(pCxt, (SSelectStmt*)pStmt);
case QUERY_NODE_VNODE_MODIF_STMT:
case QUERY_NODE_CREATE_DATABASE_STMT:
case QUERY_NODE_DROP_DATABASE_STMT:
case QUERY_NODE_ALTER_DATABASE_STMT:
@ -84,7 +90,10 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
case QUERY_NODE_ALTER_TABLE_STMT:
case QUERY_NODE_CREATE_USER_STMT:
case QUERY_NODE_ALTER_USER_STMT:
case QUERY_NODE_DROP_USER_STMT:
break;
case QUERY_NODE_DROP_USER_STMT: {
return authDropUser(pCxt, (SDropUserReq*)pStmt);
}
case QUERY_NODE_USE_DATABASE_STMT:
case QUERY_NODE_CREATE_DNODE_STMT:
case QUERY_NODE_DROP_DNODE_STMT:

View File

@ -262,9 +262,9 @@ static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subque
break;
case QUERY_NODE_SET_OPERATOR: {
SSetOperator* pSetOp = (SSetOperator*)pStmt;
code = calcConstQuery(pCxt, pSetOp->pLeft, subquery);
code = calcConstQuery(pCxt, pSetOp->pLeft, false);
if (TSDB_CODE_SUCCESS == code) {
code = calcConstQuery(pCxt, pSetOp->pRight, subquery);
code = calcConstQuery(pCxt, pSetOp->pRight, false);
}
break;
}

View File

@ -480,6 +480,31 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
return res;
}
static int32_t parseTimeFromValueNode(SValueNode* pVal) {
if (IS_SIGNED_NUMERIC_TYPE(pVal->node.resType.type)) {
return TSDB_CODE_SUCCESS;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pVal->node.resType.type)) {
pVal->datum.i = pVal->datum.u;
return TSDB_CODE_SUCCESS;
} else if (IS_FLOAT_TYPE(pVal->node.resType.type)) {
pVal->datum.i = pVal->datum.d;
return TSDB_CODE_SUCCESS;
} else if (TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) {
pVal->datum.i = pVal->datum.b;
return TSDB_CODE_SUCCESS;
} else if (IS_VAR_DATA_TYPE(pVal->node.resType.type) || TSDB_DATA_TYPE_TIMESTAMP == pVal->node.resType.type) {
if (TSDB_CODE_SUCCESS == taosParseTime(pVal->literal, &pVal->datum.i, pVal->node.resType.bytes,
pVal->node.resType.precision, tsDaylight)) {
return TSDB_CODE_SUCCESS;
}
char* pEnd = NULL;
pVal->datum.i = strtoll(pVal->literal, &pEnd, 10);
return (NULL != pEnd && '\0' == *pEnd) ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
} else {
return TSDB_CODE_FAILED;
}
}
static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SDataType targetDt) {
uint8_t precision = (NULL != pCxt->pCurrStmt ? pCxt->pCurrStmt->precision : targetDt.precision);
pVal->node.resType.precision = precision;
@ -571,7 +596,7 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
if (taosParseTime(pVal->literal, &pVal->datum.i, targetDt.bytes, precision, tsDaylight) != TSDB_CODE_SUCCESS) {
if (TSDB_CODE_SUCCESS != parseTimeFromValueNode(pVal)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
*(int64_t*)&pVal->typeData = pVal->datum.i;
@ -1660,10 +1685,10 @@ static int32_t createPrimaryKeyColByTable(STranslateContext* pCxt, STableNode* p
if (NULL == pCol) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, false, pCol);
} else {
// todo
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME);
if (!findAndSetColumn(pCol, pTable)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_FUNC);
}
*pPrimaryKey = (SNode*)pCol;
return TSDB_CODE_SUCCESS;

View File

@ -148,6 +148,10 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Invalid number of tag columns";
case TSDB_CODE_PAR_INVALID_INTERNAL_PK:
return "Invalid _c0 or _rowts expression";
case TSDB_CODE_PAR_INVALID_TIMELINE_FUNC:
return "Invalid timeline function";
case TSDB_CODE_PAR_INVALID_PASSWD:
return "Invalid password";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:

View File

@ -187,7 +187,7 @@ TEST_F(ParserSelectTest, semanticError) {
run("SELECT c2 FROM t1 tt1, t1 tt2 WHERE tt1.c1 = tt2.c1", TSDB_CODE_PAR_AMBIGUOUS_COLUMN, PARSER_STAGE_TRANSLATE);
// TSDB_CODE_PAR_WRONG_VALUE_TYPE
run("SELECT timestamp '2010' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE, PARSER_STAGE_TRANSLATE);
run("SELECT timestamp '2010a' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE, PARSER_STAGE_TRANSLATE);
// TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION
run("SELECT c2 FROM t1 tt1 join t1 tt2 on COUNT(*) > 0", TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION,
@ -235,9 +235,11 @@ TEST_F(ParserSelectTest, semanticError) {
TEST_F(ParserSelectTest, setOperator) {
useDb("root", "test");
run("SELECT * FROM t1 UNION ALL SELECT * FROM t1");
// run("SELECT * FROM t1 UNION ALL SELECT * FROM t1");
run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)");
// run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)");
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)");
}
} // namespace ParserTest

View File

@ -582,7 +582,7 @@ static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
return false;
}
SOperatorNode* pOper = (SOperatorNode*)pJoin->pOnConditions;
SOperatorNode* pOper = (SOperatorNode*)pCond;
if (OP_TYPE_EQUAL != pOper->opType) {
return false;
}
@ -608,12 +608,16 @@ static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin,
if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
}
bool hasPrimaryKeyEqualCond = false;
SNode* pCond = NULL;
FOREACH(pCond, pOnCond->pParameterList) {
if (!cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
if (cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
hasPrimaryKeyEqualCond = true;
}
}
if (!hasPrimaryKeyEqualCond) {
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -261,6 +261,22 @@ typedef struct SSetSlotIdCxt {
SHashObj* pRightHash;
} SSetSlotIdCxt;
static void dumpSlots(const char* pName, SHashObj* pHash) {
if (NULL == pHash) {
return;
}
planDebug("%s", pName);
void* pIt = taosHashIterate(pHash, NULL);
while (NULL != pIt) {
size_t len = 0;
char* pKey = taosHashGetKey(pIt, &len);
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
strncpy(name, pKey, len);
planDebug("\tslot name = %s", name);
pIt = taosHashIterate(pHash, pIt);
}
}
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
@ -273,6 +289,8 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
// pIndex is definitely not NULL, otherwise it is a bug
if (NULL == pIndex) {
planError("doSetSlotId failed, invalid slot name %s", name);
dumpSlots("left datablock desc", pCxt->pLeftHash);
dumpSlots("right datablock desc", pCxt->pRightHash);
pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
return DEAL_RES_ERROR;
}

View File

@ -23,10 +23,16 @@ class PlanJoinTest : public PlannerTestBase {};
TEST_F(PlanJoinTest, basic) {
useDb("root", "test");
run("select t1.c1, t2.c2 from st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
run("SELECT t1.c1, t2.c2 FROM st1s1 t1, st1s2 t2 WHERE t1.ts = t2.ts");
run("select t1.*, t2.* from st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
run("SELECT t1.*, t2.* FROM st1s1 t1, st1s2 t2 WHERE t1.ts = t2.ts");
// run("select t1.c1, t2.c1 from st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1 and t1.c2 = 'abc' and "
// "t2.c2 = 'qwe'");
run("SELECT t1.c1, t2.c1 FROM st1s1 t1 JOIN st1s2 t2 ON t1.ts = t2.ts");
}
TEST_F(PlanJoinTest, withWhere) {
useDb("root", "test");
run("SELECT t1.c1, t2.c1 FROM st1s1 t1 JOIN st1s2 t2 ON t1.ts = t2.ts "
"WHERE t1.c1 > t2.c1 AND t1.c2 = 'abc' AND t2.c2 = 'qwe'");
}

View File

@ -23,9 +23,13 @@ class PlanSubqeuryTest : public PlannerTestBase {};
TEST_F(PlanSubqeuryTest, basic) {
useDb("root", "test");
run("SELECT * FROM (SELECT * FROM t1)");
if (0 == g_skipSql) {
run("SELECT * FROM (SELECT * FROM t1)");
// run("SELECT LAST(c1) FROM ( SELECT * FROM t1)");
run("SELECT LAST(c1) FROM (SELECT * FROM t1)");
}
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)");
}
TEST_F(PlanSubqeuryTest, doubleGroupBy) {

View File

@ -25,23 +25,53 @@ class PlannerEnv : public testing::Environment {
virtual void SetUp() {
initMetaDataEnv();
generateMetaData();
initLog("/tmp/td");
}
virtual void TearDown() { destroyMetaDataEnv(); }
PlannerEnv() {}
virtual ~PlannerEnv() {}
private:
void initLog(const char* path) {
dDebugFlag = 143;
vDebugFlag = 0;
mDebugFlag = 143;
cDebugFlag = 0;
jniDebugFlag = 0;
tmrDebugFlag = 135;
uDebugFlag = 135;
rpcDebugFlag = 143;
qDebugFlag = 143;
wDebugFlag = 0;
sDebugFlag = 0;
tsdbDebugFlag = 0;
tsLogEmbedded = 1;
tsAsyncLog = 0;
taosRemoveDir(path);
taosMkDir(path);
tstrncpy(tsLogDir, path, PATH_MAX);
if (taosInitLog("taoslog", 1) != 0) {
std::cout << "failed to init log file" << std::endl;
}
}
};
static void parseArg(int argc, char* argv[]) {
int opt = 0;
const char* optstring = "";
static struct option long_options[] = {{"dump", optional_argument, NULL, 'd'}, {0, 0, 0, 0}};
static struct option long_options[] = {
{"dump", optional_argument, NULL, 'd'}, {"skipSql", optional_argument, NULL, 's'}, {0, 0, 0, 0}};
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
switch (opt) {
case 'd':
setDumpModule(optarg);
break;
case 's':
g_skipSql = 1;
break;
default:
break;
}

View File

@ -47,6 +47,7 @@ enum DumpModule {
};
DumpModule g_dumpModule = DUMP_MODULE_NOTHING;
int32_t g_skipSql = 0;
void setDumpModule(const char* pModule) {
if (NULL == pModule) {

View File

@ -32,6 +32,8 @@ class PlannerTestBase : public testing::Test {
std::unique_ptr<PlannerTestBaseImpl> impl_;
};
extern int32_t g_skipSql;
extern void setDumpModule(const char* pModule);
#endif // PLAN_TEST_UTIL_H

View File

@ -27,13 +27,11 @@ typedef struct SScalarCtx {
SArray *pBlockList; /* element is SSDataBlock* */
SHashObj *pRes; /* element is SScalarParam */
void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values
SHashObj *udf2Handle;
} SScalarCtx;
#define SCL_DATA_TYPE_DUMMY_HASH 9000
#define SCL_DEFAULT_OP_NUM 10
#define SCL_DEFAULT_UDF_NUM 8
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type))
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)

View File

@ -154,18 +154,6 @@ void sclFreeRes(SHashObj *res) {
taosHashCleanup(res);
}
void sclFreeUdfHandles(SHashObj *udf2handle) {
void *pIter = taosHashIterate(udf2handle, NULL);
while (pIter) {
UdfcFuncHandle *handle = (UdfcFuncHandle *)pIter;
if (handle) {
teardownUdf(*handle);
}
pIter = taosHashIterate(udf2handle, pIter);
}
taosHashCleanup(udf2handle);
}
void sclFreeParam(SScalarParam *param) {
if (param->columnData != NULL) {
colDataDestroy(param->columnData);
@ -375,28 +363,22 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
if (fmIsUserDefinedFunc(node->funcId)) {
UdfcFuncHandle udfHandle = NULL;
char* udfName = node->functionName;
if (ctx->udf2Handle) {
UdfcFuncHandle *pHandle = taosHashGet(ctx->udf2Handle, udfName, strlen(udfName));
if (pHandle) {
udfHandle = *pHandle;
}
}
if (udfHandle == NULL) {
code = setupUdf(udfName, &udfHandle);
if (code != 0) {
sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", udfName, code);
goto _return;
}
if (ctx->udf2Handle) {
taosHashPut(ctx->udf2Handle, udfName, strlen(udfName), &udfHandle, sizeof(UdfcFuncHandle));
}
code = setupUdf(node->functionName, &udfHandle);
if (code != 0) {
sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code);
goto _return;
}
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
if (code != 0) {
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return;
}
code = teardownUdf(udfHandle);
if (code != 0) {
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return;
}
} else {
SScalarFuncExecFuncs ffpSet = {0};
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
@ -910,20 +892,15 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
SScalarCtx ctx = {0};
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == ctx.udf2Handle) {
sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM);
sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
SCL_ERR_JRET(ctx.code);
*pRes = pNode;
_return:
sclFreeUdfHandles(ctx.udf2Handle);
sclFreeRes(ctx.pRes);
return code;
}
@ -939,14 +916,10 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
// TODO: OPT performance
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == ctx.udf2Handle) {
sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM);
sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
SCL_ERR_JRET(ctx.code);
@ -964,7 +937,6 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
_return:
//nodesDestroyNode(pNode);
sclFreeUdfHandles(ctx.udf2Handle);
sclFreeRes(ctx.pRes);
return code;
}

View File

@ -1,5 +1,5 @@
# tdb
add_library(tdb SHARED "")
add_library(tdb STATIC "")
target_sources(tdb
PRIVATE
"src/db/tdbPCache.c"

View File

@ -463,7 +463,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function handle")
//schemaless
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type")

View File

@ -604,6 +604,7 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
case TSDB_DATA_TYPE_DOUBLE:
return TMAX(25, width);
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_BINARY:
if (field->bytes > shell.args.displayWidth) {
return TMAX(shell.args.displayWidth, width);