Merge remote-tracking branch 'origin/3.0' into feature/shm

This commit is contained in:
Shengliang Guan 2022-04-02 11:02:10 +08:00
commit dc7147760b
19 changed files with 400 additions and 459 deletions

View File

@ -1396,10 +1396,9 @@ typedef struct {
typedef struct { typedef struct {
float xFilesFactor; float xFilesFactor;
int8_t delayUnit; int32_t delay;
int8_t nFuncIds; int8_t nFuncIds;
int32_t* pFuncIds; func_id_t* pFuncIds;
int64_t delay;
} SRSmaParam; } SRSmaParam;
typedef struct SVCreateTbReq { typedef struct SVCreateTbReq {

View File

@ -31,6 +31,7 @@ typedef int16_t col_id_t;
typedef int8_t col_type_t; typedef int8_t col_type_t;
typedef int32_t col_bytes_t; typedef int32_t col_bytes_t;
typedef uint16_t schema_ver_t; typedef uint16_t schema_ver_t;
typedef int32_t func_id_t;
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct { typedef struct {

View File

@ -152,7 +152,8 @@ typedef struct SResultColumn {
typedef struct SReqResultInfo { typedef struct SReqResultInfo {
const char* pRspMsg; const char* pRspMsg;
const char* pData; const char* pData;
TAOS_FIELD* fields; TAOS_FIELD* fields; // todo, column names are not needed.
TAOS_FIELD* userFields; // the fields info that return to user
uint32_t numOfCols; uint32_t numOfCols;
int32_t* length; int32_t* length;
char** convertBuf; char** convertBuf;
@ -221,6 +222,7 @@ void destroyRequest(SRequestObj* pRequest);
char* getDbOfConnection(STscObj* pObj); char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db); void setConnectionDB(STscObj* pTscObj, const char* db);
void resetConnectDB(STscObj* pTscObj);
void taos_init_imp(void); void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char* str); int taos_options_imp(TSDB_OPTION option, const char* str);

View File

@ -169,6 +169,7 @@ static void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
taosMemoryFreeClear(pResInfo->row); taosMemoryFreeClear(pResInfo->row);
taosMemoryFreeClear(pResInfo->pCol); taosMemoryFreeClear(pResInfo->pCol);
taosMemoryFreeClear(pResInfo->fields); taosMemoryFreeClear(pResInfo->fields);
taosMemoryFreeClear(pResInfo->userFields);
if (pResInfo->convertBuf != NULL) { if (pResInfo->convertBuf != NULL) {
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {

View File

@ -56,7 +56,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
} }
char localDb[TSDB_DB_NAME_LEN] = {0}; char localDb[TSDB_DB_NAME_LEN] = {0};
if (db != NULL) { if (db != NULL && strlen(db) > 0) {
if (!validateDbName(db)) { if (!validateDbName(db)) {
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
return NULL; return NULL;
@ -164,6 +164,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
if ((*pQuery)->haveResultSet) { if ((*pQuery)->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols); setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
} }
TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*); TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*);
TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*); TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*);
} }
@ -228,12 +229,24 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
assert(pSchema != NULL && numOfCols > 0); assert(pSchema != NULL && numOfCols > 0);
pResInfo->numOfCols = numOfCols; pResInfo->numOfCols = numOfCols;
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(pSchema[0])); pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
pResInfo->fields[i].bytes = pSchema[i].bytes; pResInfo->fields[i].bytes = pSchema[i].bytes;
pResInfo->fields[i].type = pSchema[i].type; pResInfo->fields[i].type = pSchema[i].type;
pResInfo->userFields[i].bytes = pSchema[i].bytes;
pResInfo->userFields[i].type = pSchema[i].type;
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
}
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
} }
} }
@ -791,6 +804,16 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
taosThreadMutexUnlock(&pTscObj->mutex); taosThreadMutexUnlock(&pTscObj->mutex);
} }
void resetConnectDB(STscObj* pTscObj) {
if (pTscObj == NULL) {
return;
}
taosThreadMutexLock(&pTscObj->mutex);
pTscObj->db[0] = 0;
taosThreadMutexUnlock(&pTscObj->mutex);
}
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
assert(pResultInfo != NULL && pRsp != NULL); assert(pResultInfo != NULL && pRsp != NULL);

View File

@ -146,7 +146,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
} }
SReqResultInfo *pResInfo = &(((SRequestObj *)res)->body.resInfo); SReqResultInfo *pResInfo = &(((SRequestObj *)res)->body.resInfo);
return pResInfo->fields; return pResInfo->userFields;
} }
TAOS_RES *taos_query(TAOS *taos, const char *sql) { TAOS_RES *taos_query(TAOS *taos, const char *sql) {
@ -365,8 +365,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
} }
bool taos_is_update_query(TAOS_RES *res) { bool taos_is_update_query(TAOS_RES *res) {
// TODO return taos_num_fields(res) == 0;
return true;
} }
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
@ -393,8 +392,11 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
int taos_validate_sql(TAOS *taos, const char *sql) { return true; } int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
void taos_reset_current_db(TAOS *taos) { void taos_reset_current_db(TAOS *taos) {
// TODO if (taos == NULL) {
return; return;
}
resetConnectDB(taos);
} }
const char *taos_get_server_info(TAOS *taos) { const char *taos_get_server_info(TAOS *taos) {

View File

@ -911,7 +911,7 @@ CREATE_MSG_FAIL:
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/ /*printf("call update ep %d\n", epoch);*/
/*printf("tmq update ep epoch %d to epoch %d\n", tmq->epoch, epoch);*/ tscDebug("tmq update ep epoch %d to epoch %d", tmq->epoch, epoch);
bool set = false; bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
@ -984,6 +984,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
tscDebug("consumer %ld recv ep", tmq->consumerId);
if (code != 0) { if (code != 0) {
tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync); tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
goto END; goto END;
@ -1032,12 +1033,14 @@ END:
int32_t tmqAskEp(tmq_t* tmq, bool sync) { int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) { if (epStatus == 1) {
tscDebug("consumer %ld skip ask ep", tmq->consumerId);
return 0; return 0;
} }
int32_t tlen = sizeof(SMqCMGetSubEpReq); int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen); SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
if (req == NULL) { if (req == NULL) {
tscError("failed to malloc get subscribe ep buf"); tscError("failed to malloc get subscribe ep buf");
atomic_store_8(&tmq->epStatus, 0);
return -1; return -1;
} }
req->consumerId = htobe64(tmq->consumerId); req->consumerId = htobe64(tmq->consumerId);
@ -1048,6 +1051,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
if (pParam == NULL) { if (pParam == NULL) {
tscError("failed to malloc subscribe param"); tscError("failed to malloc subscribe param");
taosMemoryFree(req); taosMemoryFree(req);
atomic_store_8(&tmq->epStatus, 0);
return -1; return -1;
} }
pParam->tmq = tmq; pParam->tmq = tmq;
@ -1059,6 +1063,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
tsem_destroy(&pParam->rspSem); tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
taosMemoryFree(req); taosMemoryFree(req);
atomic_store_8(&tmq->epStatus, 0);
return -1; return -1;
} }
@ -1076,6 +1081,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer %ld ask ep", tmq->consumerId);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
@ -1214,7 +1221,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus != TMQ_VG_STATUS__IDLE) {
tscDebug("skip vg %d", pVg->vgId); tscDebug("consumer %ld skip vg %d", tmq->consumerId, pVg->vgId);
continue; continue;
} }
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
@ -1258,7 +1265,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1); atomic_add_fetch_32(&tmq->waitingRequest, 1);
tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset); tscDebug("consumer %ld send poll: vg %d, req offset %ld", tmq->consumerId, pVg->vgId, pVg->currentOffset);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
@ -1322,7 +1329,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
tmqHandleNoPollRsp(tmq, rspHead, &reset); tmqHandleNoPollRsp(tmq, rspHead, &reset);
taosFreeQitem(rspHead); taosFreeQitem(rspHead);
if (pollIfReset && reset) { if (pollIfReset && reset) {
tscDebug("reset and repoll\n"); tscDebug("consumer %ld reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, blockingTime); tmqPollImpl(tmq, blockingTime);
} }
} }
@ -1561,24 +1568,3 @@ TAOS_ROW tmq_get_row(tmq_message_t* message) {
} }
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; } char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; }
#if 0
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
tmq_t* pTmq = taosMemoryMalloc(sizeof(tmq_t));
if (pTmq == NULL) {
return NULL;
}
strcpy(pTmq->groupId, conf->groupId);
strcpy(pTmq->clientId, conf->clientId);
pTmq->pTscObj = (STscObj*)conn;
pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;
return pTmq;
}
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
taosMemoryFreeClear(pMsgBody);
}
#endif

View File

@ -314,13 +314,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
} }
if (pReq->rollup && pReq->stbCfg.pRSmaParam) { if (pReq->rollup && pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeBinary(buf, (const void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI32(buf, param->delay);
tlen += taosEncodeFixedI8(buf, param->nFuncIds); tlen += taosEncodeFixedI8(buf, param->nFuncIds);
for (int8_t i = 0; i < param->nFuncIds; ++i) { for (int8_t i = 0; i < param->nFuncIds; ++i) {
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
} }
tlen += taosEncodeFixedI64(buf, param->delay);
} }
break; break;
case TD_CHILD_TABLE: case TD_CHILD_TABLE:
@ -339,13 +338,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
} }
if (pReq->rollup && pReq->ntbCfg.pRSmaParam) { if (pReq->rollup && pReq->ntbCfg.pRSmaParam) {
SRSmaParam *param = pReq->ntbCfg.pRSmaParam; SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeBinary(buf, (const void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI32(buf, param->delay);
tlen += taosEncodeFixedI8(buf, param->nFuncIds); tlen += taosEncodeFixedI8(buf, param->nFuncIds);
for (int8_t i = 0; i < param->nFuncIds; ++i) { for (int8_t i = 0; i < param->nFuncIds; ++i) {
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
} }
tlen += taosEncodeFixedI64(buf, param->delay);
} }
break; break;
default: default:
@ -387,17 +385,17 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
if (pReq->rollup) { if (pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam)); pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->stbCfg.pRSmaParam;
buf = taosDecodeFixedU32(buf, (uint32_t *)&param->xFilesFactor); buf = taosDecodeBinaryTo(buf, (void*)&param->xFilesFactor, sizeof(param->xFilesFactor));
buf = taosDecodeFixedI8(buf, &param->delayUnit); buf = taosDecodeFixedI32(buf, &param->delay);
buf = taosDecodeFixedI8(buf, &param->nFuncIds); buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if (param->nFuncIds > 0) { if (param->nFuncIds > 0) {
param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t));
for (int8_t i = 0; i < param->nFuncIds; ++i) { for (int8_t i = 0; i < param->nFuncIds; ++i) {
buf = taosDecodeFixedI32(buf, param->pFuncIds + i); buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
} }
} else { } else {
param->pFuncIds = NULL; param->pFuncIds = NULL;
} }
buf = taosDecodeFixedI64(buf, &param->delay);
} else { } else {
pReq->stbCfg.pRSmaParam = NULL; pReq->stbCfg.pRSmaParam = NULL;
} }
@ -420,17 +418,17 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
if (pReq->rollup) { if (pReq->rollup) {
pReq->ntbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam)); pReq->ntbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->ntbCfg.pRSmaParam; SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
buf = taosDecodeFixedU32(buf, (uint32_t *)&param->xFilesFactor); buf = taosDecodeBinaryTo(buf, (void*)&param->xFilesFactor, sizeof(param->xFilesFactor));
buf = taosDecodeFixedI8(buf, &param->delayUnit); buf = taosDecodeFixedI32(buf, &param->delay);
buf = taosDecodeFixedI8(buf, &param->nFuncIds); buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if (param->nFuncIds > 0) { if (param->nFuncIds > 0) {
param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t));
for (int8_t i = 0; i < param->nFuncIds; ++i) { for (int8_t i = 0; i < param->nFuncIds; ++i) {
buf = taosDecodeFixedI32(buf, param->pFuncIds + i); buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
} }
} else { } else {
param->pFuncIds = NULL; param->pFuncIds = NULL;
} }
buf = taosDecodeFixedI64(buf, &param->delay);
} else { } else {
pReq->ntbCfg.pRSmaParam = NULL; pReq->ntbCfg.pRSmaParam = NULL;
} }

View File

@ -354,6 +354,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.name = (char *)tNameGetTableName(&name); req.name = (char *)tNameGetTableName(&name);
req.ttl = 0; req.ttl = 0;
req.keep = 0; req.keep = 0;
req.rollup = pStb->aggregationMethod > -1 ? 1 : 0;
req.type = TD_SUPER_TABLE; req.type = TD_SUPER_TABLE;
req.stbCfg.suid = pStb->uid; req.stbCfg.suid = pStb->uid;
req.stbCfg.nCols = pStb->numOfColumns; req.stbCfg.nCols = pStb->numOfColumns;
@ -405,9 +406,38 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
} }
} }
SRSmaParam *pRSmaParam = NULL;
if (req.rollup) {
pRSmaParam = (SRSmaParam *)taosMemoryCalloc(1, sizeof(SRSmaParam));
if (pRSmaParam == NULL) {
taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pRSmaParam->xFilesFactor = pStb->xFilesFactor;
pRSmaParam->delay = pStb->delay;
pRSmaParam->nFuncIds = 1; // only 1 aggregation method supported currently
pRSmaParam->pFuncIds = (func_id_t *)taosMemoryCalloc(pRSmaParam->nFuncIds, sizeof(func_id_t));
if (pRSmaParam->pFuncIds == NULL) {
taosMemoryFreeClear(req.stbCfg.pRSmaParam);
taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t f = 0; f < pRSmaParam->nFuncIds; ++f) {
*(pRSmaParam->pFuncIds + f) = pStb->aggregationMethod;
}
req.stbCfg.pRSmaParam = pRSmaParam;
}
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead); int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen); SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) { if (pHead == NULL) {
if (pRSmaParam) {
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam);
}
taosMemoryFreeClear(req.stbCfg.pSchema); taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
@ -420,6 +450,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
tSerializeSVCreateTbReq(&pBuf, &req); tSerializeSVCreateTbReq(&pBuf, &req);
*pContLen = contLen; *pContLen = contLen;
if (pRSmaParam) {
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam);
}
taosMemoryFreeClear(req.stbCfg.pSchema); taosMemoryFreeClear(req.stbCfg.pSchema);
return pHead; return pHead;
} }
@ -632,6 +666,9 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
stbObj.dbUid = pDb->uid; stbObj.dbUid = pDb->uid;
stbObj.version = 1; stbObj.version = 1;
stbObj.nextColId = 1; stbObj.nextColId = 1;
stbObj.xFilesFactor = pCreate->xFilesFactor;
stbObj.aggregationMethod = pCreate->aggregationMethod;
stbObj.delay = pCreate->delay;
stbObj.ttl = pCreate->ttl; stbObj.ttl = pCreate->ttl;
stbObj.numOfColumns = pCreate->numOfColumns; stbObj.numOfColumns = pCreate->numOfColumns;
stbObj.numOfTags = pCreate->numOfTags; stbObj.numOfTags = pCreate->numOfTags;

View File

@ -507,7 +507,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
// TODO: log rebalance statistics // TODO: log rebalance statistics
SSdbRaw *pSubRaw = mndSubActionEncode(pSub); SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pSubRaw, SDB_STATUS_UPDATING); sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pSubRaw); mndTransAppendRedolog(pTrans, pSubRaw);
} }
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);

View File

@ -40,6 +40,10 @@ const char *sdbTableName(ESdbType type) {
return "auth"; return "auth";
case SDB_ACCT: case SDB_ACCT:
return "acct"; return "acct";
case SDB_STREAM:
return "stream";
case SDB_OFFSET:
return "offset";
case SDB_SUBSCRIBE: case SDB_SUBSCRIBE:
return "subscribe"; return "subscribe";
case SDB_CONSUMER: case SDB_CONSUMER:

View File

@ -78,10 +78,13 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error // TODO: handle error
} }
// TODO: maybe need to clear the request struct // TODO: to encapsule a free API
taosMemoryFree(vCreateTbReq.stbCfg.pSchema); taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
if(vCreateTbReq.stbCfg.pRSmaParam) {
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam); taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
}
taosMemoryFree(vCreateTbReq.dbFName); taosMemoryFree(vCreateTbReq.dbFName);
taosMemoryFree(vCreateTbReq.name); taosMemoryFree(vCreateTbReq.name);
break; break;
@ -110,19 +113,26 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error // TODO: handle error
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
} }
// TODO: to encapsule a free API
taosMemoryFree(pCreateTbReq->name); taosMemoryFree(pCreateTbReq->name);
taosMemoryFree(pCreateTbReq->dbFName); taosMemoryFree(pCreateTbReq->dbFName);
if (pCreateTbReq->type == TD_SUPER_TABLE) { if (pCreateTbReq->type == TD_SUPER_TABLE) {
taosMemoryFree(pCreateTbReq->stbCfg.pSchema); taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema); taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
if (pCreateTbReq->stbCfg.pRSmaParam) {
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam); taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
}
} else if (pCreateTbReq->type == TD_CHILD_TABLE) { } else if (pCreateTbReq->type == TD_CHILD_TABLE) {
taosMemoryFree(pCreateTbReq->ctbCfg.pTag); taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
} else { } else {
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema); taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
if (pCreateTbReq->ntbCfg.pRSmaParam) {
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam); taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
} }
} }
}
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
taosArrayDestroy(vCreateTbBatchReq.pArray); taosArrayDestroy(vCreateTbBatchReq.pArray);
@ -145,9 +155,13 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vAlterTbReq = {0}; SVCreateTbReq vAlterTbReq = {0};
vTrace("vgId:%d, process alter stb req", pVnode->vgId); vTrace("vgId:%d, process alter stb req", pVnode->vgId);
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
// TODO: to encapsule a free API
taosMemoryFree(vAlterTbReq.stbCfg.pSchema); taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
if (vAlterTbReq.stbCfg.pRSmaParam) {
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
}
taosMemoryFree(vAlterTbReq.dbFName); taosMemoryFree(vAlterTbReq.dbFName);
taosMemoryFree(vAlterTbReq.name); taosMemoryFree(vAlterTbReq.name);
break; break;

View File

@ -393,6 +393,7 @@ typedef struct STableScanInfo {
int32_t times; // repeat counts int32_t times; // repeat counts
int32_t current; int32_t current;
int32_t reverseTimes; // 0 by default int32_t reverseTimes; // 0 by default
SNode* pFilterNode; // filter operator info
SqlFunctionCtx* pCtx; // next operator query context SqlFunctionCtx* pCtx; // next operator query context
SResultRowInfo* pResultRowInfo; SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset; int32_t* rowCellInfoOffset;
@ -501,12 +502,6 @@ typedef struct SProjectOperatorInfo {
int64_t curOutput; int64_t curOutput;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SLimitOperatorInfo {
SLimit limit;
int64_t currentOffset;
int64_t currentRows;
} SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo { typedef struct SSLimitOperatorInfo {
int64_t groupTotal; int64_t groupTotal;
int64_t currentGroupOffset; int64_t currentGroupOffset;
@ -525,11 +520,6 @@ typedef struct SSLimitOperatorInfo {
int64_t threshold; int64_t threshold;
} SSLimitOperatorInfo; } SSLimitOperatorInfo;
typedef struct SFilterOperatorInfo {
SSingleColumnFilterInfo* pFilterInfo;
int32_t numOfFilterCols;
} SFilterOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
struct SFillInfo* pFillInfo; struct SFillInfo* pFillInfo;
SSDataBlock* pRes; SSDataBlock* pRes;
@ -639,7 +629,7 @@ typedef struct SDistinctOperatorInfo {
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo); int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
@ -682,22 +672,12 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput); int32_t numOfOutput);
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput); void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win); STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);

View File

@ -194,9 +194,6 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
} }
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset);
void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset); int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(SqlFunctionCtx* pCtx); static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
@ -214,8 +211,6 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
// static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win); // static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
@ -264,10 +259,6 @@ static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision,
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput,
SExprInfo* pExpr);
static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput,
SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
@ -3025,6 +3016,23 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo,
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
} }
if (pTableScanInfo->pFilterNode != NULL) {
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode((SNode*)pTableScanInfo->pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
// filterSetColFieldData(pQueryAttr->pFilters, pBlock->info.numOfCols, pBlock->pDataBlock);
// if (pQueryAttr->pFilters != NULL) {
// filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
// }
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3344,11 +3352,6 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCt
offset += pLocalExprInfo->base.resSchema.bytes; offset += pLocalExprInfo->base.resSchema.bytes;
} }
// todo : use index to avoid iterator all possible output columns
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddev(pRuntimeEnv, pCtx, numOfOutput, pExprInfo);
}
} }
// set the tsBuf start position before check each data block // set the tsBuf start position before check each data block
@ -3544,19 +3547,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput)
} }
} }
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity) {
SSDataBlock* pDataBlock = pBInfo->pRes;
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId < 0) {
memset(pBInfo->pCtx[i].pOutput, 0, pColInfo->info.bytes * (*bufCapacity));
}
}
}
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]); struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
@ -3714,23 +3704,6 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow
return pTableQueryInfo; return pTableQueryInfo;
} }
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) {
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(1, sizeof(STableQueryInfo));
// pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey;
// set more initial size of interval/groupby query
int32_t initialSize = 16;
int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableQueryInfo);
return NULL;
}
return pTableQueryInfo;
}
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
if (pTableQueryInfo == NULL) { if (pTableQueryInfo == NULL) {
return; return;
@ -3834,30 +3807,6 @@ void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKe
pAggInfo->groupId = tableGroupId; pAggInfo->groupId = tableGroupId;
} }
void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfCols,
int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage* page = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset);
offset += pCtx[i].resDataInfo.bytes;
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF ||
functionId == FUNCTION_DERIVATIVE) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
}
/*
* set the output buffer information and intermediate buffer,
* not all queries require the interResultBuf, such as COUNT
*/
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
}
}
void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
@ -4626,73 +4575,6 @@ static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_
return terrno; return terrno;
} }
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator,
void* param) {
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pQueryAttr->tsdb = tsdb;
if (tsdb != NULL) {
int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr);
pRuntimeEnv->groupResInfo.totalGroup = (int32_t)(pQueryAttr->stableQuery ? GET_NUM_OF_TABLEGROUP(pRuntimeEnv) : 0);
pRuntimeEnv->enableGroupData = false;
pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pTsBuf = pTsBuf;
pRuntimeEnv->cur.vgroupIndex = -1;
setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo);
if (sourceOptr != NULL) {
assert(pRuntimeEnv->proot == NULL);
pRuntimeEnv->proot = sourceOptr;
}
if (pTsBuf != NULL) {
int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order);
}
int32_t ps = 4096;
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize);
int32_t TENMB = 1024 * 1024 * 10;
int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, "", "/tmp");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// create runtime environment
int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
pQInfo->summary.queryProfEvents = taosArrayInit(512, sizeof(SQueryProfEvent));
if (pQInfo->summary.queryProfEvents == NULL) {
// qDebug("QInfo:0x%"PRIx64" failed to allocate query prof events array", pQInfo->qId);
}
pQInfo->summary.operatorProfResults =
taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK);
if (pQInfo->summary.operatorProfResults == NULL) {
// qDebug("QInfo:0x%"PRIx64" failed to allocate operator prof results hash", pQInfo->qId);
}
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t)pQueryAttr->tableGroupInfo.numOfTables, pOperator, param);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// setTaskStatus(pOperator->pTaskInfo, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS;
}
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) { static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
#if 0 #if 0
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
@ -4790,7 +4672,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
// break; // break;
// } // }
// //
// pRuntimeEnv->current = *pTableQueryInfo;
// doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order); // doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order);
// } // }
@ -5548,7 +5429,7 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo,
SExecTaskInfo* pTaskInfo) { SNode* pCondition, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
@ -5567,6 +5448,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
taosArrayPush(pInfo->block.pDataBlock, &idata); taosArrayPush(pInfo->block.pDataBlock, &idata);
} }
pInfo->pFilterNode = pCondition;
pInfo->pTsdbReadHandle = pTsdbReadHandle; pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
@ -5838,7 +5720,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i);
int64_t tmp = 0; int64_t tmp = 0;
char t[10] = {0}; char t[10] = {0};
STR_TO_VARSTR(t, "_"); STR_TO_VARSTR(t, "_"); //TODO
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
colDataAppend(pColInfoData, numOfRows, t, false); colDataAppend(pColInfoData, numOfRows, t, false);
} else { } else {
@ -6890,58 +6772,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
} }
static SSDataBlock* doLimit(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SLimitOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
return NULL;
}
if (pInfo->currentOffset == 0) {
break;
} else if (pInfo->currentOffset >= pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else { // TODO handle the data movement
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
}
pInfo->currentOffset = 0;
break;
}
}
if (pInfo->currentRows + pBlock->info.rows >= pInfo->limit.limit) {
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->currentRows);
pInfo->currentRows = pInfo->limit.limit;
doSetOperatorCompleted(pOperator);
} else {
pInfo->currentRows += pBlock->info.rows;
}
return pBlock;
}
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) { if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -7791,11 +7621,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
} }
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*)param;
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
}
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param; SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param;
taosHashCleanup(pInfo->pSet); taosHashCleanup(pInfo->pSet);
@ -8810,7 +8635,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pColList, pTaskInfo); pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
@ -9175,41 +9000,6 @@ _complete:
return code; return code;
} }
int32_t cloneExprFilterInfo(SColumnFilterInfo** dst, SColumnFilterInfo* src, int32_t filterNum) {
if (filterNum <= 0) {
return TSDB_CODE_SUCCESS;
}
*dst = taosMemoryCalloc(filterNum, sizeof(*src));
if (*dst == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(*dst, src, sizeof(*src) * filterNum);
for (int32_t i = 0; i < filterNum; i++) {
if ((*dst)[i].filterstr && dst[i]->len > 0) {
void* pz = taosMemoryCalloc(1, (size_t)(*dst)[i].len + 1);
if (pz == NULL) {
if (i == 0) {
taosMemoryFree(*dst);
} else {
freeColumnFilterInfo(*dst, i);
}
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(pz, (void*)src->pz, (size_t)src->len + 1);
(*dst)[i].pz = (int64_t)pz;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs, static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs,
int32_t numOfOutput, int32_t tagLen, bool superTable) { int32_t numOfOutput, int32_t tagLen, bool superTable) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
@ -9232,113 +9022,6 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// TODO tag length should be passed from client, refactor
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters) {
tExprNode* expr = NULL;
TRY(TSDB_MAX_TAG_CONDITIONS) { expr = exprTreeFromBinary(data, len); }
CATCH(code) {
CLEANUP_EXECUTE();
return code;
}
END_TRY
if (expr == NULL) {
// qError("failed to create expr tree");
return TSDB_CODE_QRY_APP_ERROR;
}
// int32_t ret = filterInitFromTree(expr, pFilters, 0);
// tExprTreeDestroy(expr, NULL);
// return ret;
}
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo**
// pFilterInfo, uint64_t qId) {
// *pFilterInfo = taosMemoryCalloc(1, sizeof(SSingleColumnFilterInfo) * numOfFilterCols);
// if (*pFilterInfo == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// for (int32_t i = 0, j = 0; i < numOfCols; ++i) {
// if (pCols[i].flist.numOfFilters > 0) {
// SSingleColumnFilterInfo* pFilter = &((*pFilterInfo)[j]);
//
// memcpy(&pFilter->info, &pCols[i], sizeof(SColumnInfo));
// pFilter->info = pCols[i];
//
// pFilter->numOfFilters = pCols[i].flist.numOfFilters;
// pFilter->pFilters = taosMemoryCalloc(pFilter->numOfFilters, sizeof(SColumnFilterElem));
// if (pFilter->pFilters == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// for (int32_t f = 0; f < pFilter->numOfFilters; ++f) {
// SColumnFilterElem* pSingleColFilter = &pFilter->pFilters[f];
// pSingleColFilter->filterInfo = pCols[i].flist.filterInfo[f];
//
// int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr;
// int32_t upper = pSingleColFilter->filterInfo.upperRelOptr;
// if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
// //qError("QInfo:0x%"PRIx64" invalid filter info", qId);
// return TSDB_CODE_QRY_INVALID_MSG;
// }
//
// pSingleColFilter->fp = getFilterOperator(lower, upper);
// if (pSingleColFilter->fp == NULL) {
// //qError("QInfo:0x%"PRIx64" invalid filter info", qId);
// return TSDB_CODE_QRY_INVALID_MSG;
// }
//
// pSingleColFilter->bytes = pCols[i].bytes;
//
// if (lower == TSDB_RELATION_IN) {
//// buildFilterSetFromBinary(&pSingleColFilter->q, (char *)(pSingleColFilter->filterInfo.pz),
///(int32_t)(pSingleColFilter->filterInfo.len));
// }
// }
//
// j++;
// }
// }
//
// return TSDB_CODE_SUCCESS;
//}
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
// for (int32_t i = 0; i < numOfFilterCols; ++i) {
// if (pFilterInfo[i].numOfFilters > 0) {
// if (pFilterInfo[i].pFilters->filterInfo.lowerRelOptr == TSDB_RELATION_IN) {
// taosHashCleanup((SHashObj *)(pFilterInfo[i].pFilters->q));
// }
// taosMemoryFreeClear(pFilterInfo[i].pFilters);
// }
// }
//
// taosMemoryFreeClear(pFilterInfo);
return NULL;
}
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId) {
for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) {
// if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 && pQueryAttr->tableCols[i].flist.filterInfo != NULL) {
// pQueryAttr->numOfFilterCols++;
// }
}
if (pQueryAttr->numOfFilterCols == 0) {
return TSDB_CODE_SUCCESS;
}
// doCreateFilterInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols, pQueryAttr->numOfFilterCols,
// &pQueryAttr->pFilterInfo, qId);
pQueryAttr->createFilterOperator = true;
return TSDB_CODE_SUCCESS;
}
static void doUpdateExprColumnIndex(STaskAttr* pQueryAttr) { static void doUpdateExprColumnIndex(STaskAttr* pQueryAttr) {
assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL); assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL);

View File

@ -3668,7 +3668,6 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
taosArrayPush(pList, &pSrc); taosArrayPush(pList, &pSrc);
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output)); FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output));
taosArrayDestroy(pList); taosArrayDestroy(pList);
// TODO Fix it // TODO Fix it
// *p = output.orig.data; // *p = output.orig.data;

View File

@ -275,7 +275,6 @@ TEST(timerangeTest, greater_and_lower) {
nodesDestroyNode(logicNode); nodesDestroyNode(logicNode);
} }
TEST(columnTest, smallint_column_greater_double_value) { TEST(columnTest, smallint_column_greater_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5]= {1, 2, 3, 4, 5}; int16_t leftv[5]= {1, 2, 3, 4, 5};
@ -386,7 +385,6 @@ TEST(columnTest, int_column_greater_smallint_value) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(columnTest, int_column_in_double_list) { TEST(columnTest, int_column_in_double_list) {
SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL; SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL;
int32_t leftv[5] = {1, 2, 3, 4, 5}; int32_t leftv[5] = {1, 2, 3, 4, 5};
@ -432,8 +430,6 @@ TEST(columnTest, int_column_in_double_list) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(columnTest, binary_column_in_binary_list) { TEST(columnTest, binary_column_in_binary_list) {
SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL; SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL;
bool eRes[5] = {true, true, false, false, false}; bool eRes[5] = {true, true, false, false, false};
@ -497,7 +493,6 @@ TEST(columnTest, binary_column_in_binary_list) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(columnTest, binary_column_like_binary) { TEST(columnTest, binary_column_like_binary) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
char rightv[64] = {0}; char rightv[64] = {0};
@ -546,7 +541,6 @@ TEST(columnTest, binary_column_like_binary) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(columnTest, binary_column_is_null) { TEST(columnTest, binary_column_is_null) {
SNode *pLeft = NULL, *opNode = NULL; SNode *pLeft = NULL, *opNode = NULL;
char leftv[5][5]= {0}; char leftv[5][5]= {0};
@ -641,8 +635,6 @@ TEST(columnTest, binary_column_is_not_null) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(opTest, smallint_column_greater_int_column) { TEST(opTest, smallint_column_greater_int_column) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5] = {1, -6, -2, 11, 101}; int16_t leftv[5] = {1, -6, -2, 11, 101};
@ -680,7 +672,6 @@ TEST(opTest, smallint_column_greater_int_column) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(opTest, smallint_value_add_int_column) { TEST(opTest, smallint_value_add_int_column) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int32_t leftv = 1; int32_t leftv = 1;
@ -719,8 +710,6 @@ TEST(opTest, smallint_value_add_int_column) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(opTest, bigint_column_multi_binary_column) { TEST(opTest, bigint_column_multi_binary_column) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int64_t leftv[5]= {1, 2, 3, 4, 5}; int64_t leftv[5]= {1, 2, 3, 4, 5};
@ -845,8 +834,6 @@ TEST(opTest, smallint_column_or_float_column) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(opTest, smallint_column_or_double_value) { TEST(opTest, smallint_column_or_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5]= {0, 2, 3, 0, -1}; int16_t leftv[5]= {0, 2, 3, 0, -1};
@ -885,7 +872,6 @@ TEST(opTest, smallint_column_or_double_value) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(opTest, binary_column_is_true) { TEST(opTest, binary_column_is_true) {
SNode *pLeft = NULL, *opNode = NULL; SNode *pLeft = NULL, *opNode = NULL;
char leftv[5][5]= {0}; char leftv[5][5]= {0};
@ -930,7 +916,6 @@ TEST(opTest, binary_column_is_true) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(filterModelogicTest, diff_columns_and_or_and) { TEST(filterModelogicTest, diff_columns_and_or_and) {
flttInitLogFile(); flttInitLogFile();
@ -1071,7 +1056,6 @@ TEST(filterModelogicTest, same_column_and_or_and) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(filterModelogicTest, diff_columns_or_and_or) { TEST(filterModelogicTest, diff_columns_or_and_or) {
SNode *pLeft1 = NULL, *pRight1 = NULL, *pLeft2 = NULL, *pRight2 = NULL, *opNode1 = NULL, *opNode2 = NULL; SNode *pLeft1 = NULL, *pRight1 = NULL, *pLeft2 = NULL, *pRight2 = NULL, *opNode1 = NULL, *opNode2 = NULL;
SNode *logicNode1 = NULL, *logicNode2 = NULL; SNode *logicNode1 = NULL, *logicNode2 = NULL;
@ -1210,8 +1194,6 @@ TEST(filterModelogicTest, same_column_or_and_or) {
blockDataDestroy(src); blockDataDestroy(src);
} }
TEST(scalarModelogicTest, diff_columns_or_and_or) { TEST(scalarModelogicTest, diff_columns_or_and_or) {
flttInitLogFile(); flttInitLogFile();
@ -1283,8 +1265,6 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) {
blockDataDestroy(src); blockDataDestroy(src);
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {
taosSeedRand(taosGetTimestampSec()); taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);

View File

@ -0,0 +1,229 @@
#### session windows
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
$vgroups = 4
$dbNamme = d0
print =============== create database $dbNamme vgroups $vgroups
sql create database $dbNamme vgroups $vgroups
sql show databases
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
#print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
sql use $dbNamme
print =============== create super table, child table and insert data
sql create table if not exists st (ts timestamp, tagtype int) tags(dev nchar(50), tag2 binary(16))
sql create table if not exists dev_001 using st tags("dev_01", "tag_01")
sql create table if not exists dev_002 using st tags("dev_02", "tag_02")
sql INSERT INTO dev_001 VALUES('2020-05-13 10:00:00.000', 1)('2020-05-13 10:00:00.005', 2)('2020-05-13 10:00:00.011', 3)
sql INSERT INTO dev_001 VALUES('2020-05-13 10:00:01.011', 4)('2020-05-13 10:00:01.611', 5)('2020-05-13 10:00:02.612', 6)
sql INSERT INTO dev_001 VALUES('2020-05-13 10:01:02.612', 7)('2020-05-13 10:02:02.612', 8)('2020-05-13 10:03:02.613', 9)
sql INSERT INTO dev_001 VALUES('2020-05-13 11:00:00.000', 10)('2020-05-13 12:00:00.000', 11)('2020-05-13 13:00:00.001', 12)
sql INSERT INTO dev_001 VALUES('2020-05-14 13:00:00.001', 13)('2020-05-15 14:00:00.000', 14)('2020-05-20 10:00:00.000', 15)
sql INSERT INTO dev_001 VALUES('2020-05-27 10:00:00.001', 16)
sql INSERT INTO dev_002 VALUES('2020-05-13 10:00:00.000', 1)('2020-05-13 10:00:00.005', 2)('2020-05-13 10:00:00.009', 3)
sql INSERT INTO dev_002 VALUES('2020-05-13 10:00:00.0021', 4)('2020-05-13 10:00:00.031', 5)('2020-05-13 10:00:00.036', 6)('2020-05-13 10:00:00.51', 7)
# session(ts,5a)
print ====> select count(*) from dev_001 session(ts,5a)
sql select count(*) from dev_001 session(ts,5a)
print ====> rows: $rows
print ====> $data00 $data01 $data02 $data03 $data04 $data05
print ====> $data10 $data11 $data12 $data13 $data14 $data15
print ====> $data20 $data21 $data22 $data23 $data24 $data25
print ====> $data30 $data31 $data32 $data33 $data34 $data35
print ====> $data40 $data41 $data42 $data43 $data44 $data45
print ====> $data50 $data51 $data52 $data53 $data54 $data55
print ====> $data60 $data61 $data62 $data63 $data64 $data65
print ====> $data70 $data71 $data72 $data73 $data74 $data75
print ====> $data80 $data81 $data82 $data83 $data84 $data85
print ====> $data90 $data91 $data92 $data93 $data94 $data95
if $rows != 15 then
return -1
endi
if $data01 != 2 then
return -1
endi
return
#
# # session(ts,5a) main query
# tdSql.query("select count(*) from (select * from dev_001) session(ts,5a)")
# tdSql.checkRows(15)
# tdSql.checkData(0, 1, 2)
#
#
# # session(ts,1s)
# tdSql.query("select count(*) from dev_001 session(ts,1s)")
# tdSql.checkRows(12)
# tdSql.checkData(0, 1, 5)
#
# # session(ts,1s) main query
# tdSql.query("select count(*) from (select * from dev_001) session(ts,1s)")
# tdSql.checkRows(12)
# tdSql.checkData(0, 1, 5)
#
# tdSql.query("select count(*) from dev_001 session(ts,1000a)")
# tdSql.checkRows(12)
# tdSql.checkData(0, 1, 5)
#
# tdSql.query("select count(*) from (select * from dev_001) session(ts,1000a)")
# tdSql.checkRows(12)
# tdSql.checkData(0, 1, 5)
#
# # session(ts,1m)
# tdSql.query("select count(*) from dev_001 session(ts,1m)")
# tdSql.checkRows(9)
# tdSql.checkData(0, 1, 8)
#
# # session(ts,1m)
# tdSql.query("select count(*) from (select * from dev_001) session(ts,1m)")
# tdSql.checkRows(9)
# tdSql.checkData(0, 1, 8)
#
# # session(ts,1h)
# tdSql.query("select count(*) from dev_001 session(ts,1h)")
# tdSql.checkRows(6)
# tdSql.checkData(0, 1, 11)
#
# # session(ts,1h)
# tdSql.query("select count(*) from (select * from dev_001) session(ts,1h)")
# tdSql.checkRows(6)
# tdSql.checkData(0, 1, 11)
#
# # session(ts,1d)
# tdSql.query("select count(*) from dev_001 session(ts,1d)")
# tdSql.checkRows(4)
# tdSql.checkData(0, 1, 13)
#
# # session(ts,1d)
# tdSql.query("select count(*) from (select * from dev_001) session(ts,1d)")
# tdSql.checkRows(4)
# tdSql.checkData(0, 1, 13)
#
# # session(ts,1w)
# tdSql.query("select count(*) from dev_001 session(ts,1w)")
# tdSql.checkRows(2)
# tdSql.checkData(0, 1, 15)
#
# # session(ts,1w)
# tdSql.query("select count(*) from (select * from dev_001) session(ts,1w)")
# tdSql.checkRows(2)
# tdSql.checkData(0, 1, 15)
#
# # session with where
# tdSql.query("select count(*),first(tagtype),last(tagtype),avg(tagtype),sum(tagtype),min(tagtype),max(tagtype),leastsquares(tagtype, 1, 1),spread(tagtype),stddev(tagtype),percentile(tagtype,0) from dev_001 where ts <'2020-05-20 0:0:0' session(ts,1d)")
#
# tdSql.checkRows(2)
# tdSql.checkData(0, 1, 13)
# tdSql.checkData(0, 2, 1)
# tdSql.checkData(0, 3, 13)
# tdSql.checkData(0, 4, 7)
# tdSql.checkData(0, 5, 91)
# tdSql.checkData(0, 6, 1)
# tdSql.checkData(0, 7, 13)
# tdSql.checkData(0, 8, '{slop:1.000000, intercept:0.000000}')
# tdSql.checkData(0, 9, 12)
# tdSql.checkData(0, 10, 3.741657387)
# tdSql.checkData(0, 11, 1)
# tdSql.checkData(1, 11, 14)
#
# # session with where main
#
# tdSql.query("select count(*),first(tagtype),last(tagtype),avg(tagtype),sum(tagtype),min(tagtype),max(tagtype),leastsquares(tagtype, 1, 1) from (select * from dev_001 where ts <'2020-05-20 0:0:0') session(ts,1d)")
#
# tdSql.checkRows(2)
# tdSql.checkData(0, 1, 13)
# tdSql.checkData(0, 2, 1)
# tdSql.checkData(0, 3, 13)
# tdSql.checkData(0, 4, 7)
# tdSql.checkData(0, 5, 91)
# tdSql.checkData(0, 6, 1)
# tdSql.checkData(0, 7, 13)
# tdSql.checkData(0, 8, '{slop:1.000000, intercept:0.000000}')
#
# # tdsql err
# tdSql.error("select * from dev_001 session(ts,1w)")
# tdSql.error("select count(*) from st session(ts,1w)")
# tdSql.error("select count(*) from dev_001 group by tagtype session(ts,1w) ")
# tdSql.error("select count(*) from dev_001 session(ts,1n)")
# tdSql.error("select count(*) from dev_001 session(ts,1y)")
# tdSql.error("select count(*) from dev_001 session(ts,0s)")
# tdSql.error("select count(*) from dev_001 session(i,1y)")
# tdSql.error("select count(*) from dev_001 session(ts,1d) where ts <'2020-05-20 0:0:0'")
#
# #test precision us
# tdSql.execute("create database test precision 'us'")
# tdSql.execute("use test")
# tdSql.execute("create table dev_001 (ts timestamp ,i timestamp ,j int)")
# tdSql.execute("insert into dev_001 values(1623046993681000,now,1)(1623046993681001,now+1s,2)(1623046993681002,now+2s,3)(1623046993681004,now+5s,4)")
#
# # session(ts,1u)
# tdSql.query("select count(*) from dev_001 session(ts,1u)")
# tdSql.checkRows(2)
# tdSql.checkData(0, 1, 3)
# tdSql.error("select count(*) from dev_001 session(i,1s)")
# # test second timestamp fileds
# tdSql.execute("create table secondts(ts timestamp,t2 timestamp,i int)")
# tdSql.error("select count(*) from secondts session(t2,2s)")
#
#
#if $loop_test == 0 then
# print =============== stop and restart taosd
# system sh/exec.sh -n dnode1 -s stop -x SIGINT
# system sh/exec.sh -n dnode1 -s start
#
# $loop_cnt = 0
# check_dnode_ready_0:
# $loop_cnt = $loop_cnt + 1
# sleep 200
# if $loop_cnt == 10 then
# print ====> dnode not ready!
# return -1
# endi
# sql show dnodes
# print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
# if $data00 != 1 then
# return -1
# endi
# if $data04 != ready then
# goto check_dnode_ready_0
# endi
#
# $loop_test = 1
# goto loop_test_pos
#endi
#
#system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -91,20 +91,23 @@ function runSimCaseOneByOnefq {
for ((i=$start;i<=$end;i++)) ; do for ((i=$start;i<=$end;i++)) ; do
line=`sed -n "$i"p jenkins/basic.txt` line=`sed -n "$i"p jenkins/basic.txt`
if [[ $line =~ ^./test.sh* ]] || [[ $line =~ ^run* ]]; then if [[ $line =~ ^./test.sh* ]] || [[ $line =~ ^run* ]]; then
case=`echo $line | grep sim$ |awk '{print $NF}'` #case=`echo $line | grep sim$ |awk '{print $NF}'`
case=`echo $line | grep -o ".*\.sim" |awk '{print $NF}'`
start_time=`date +%s` start_time=`date +%s`
date +%F\ %T | tee -a out.log date +%F\ %T | tee -a out.log
if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]]; then if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]]; then
echo -n $case #echo -n $case
./test.sh -f $case > case.log 2>&1 \ echo -n $line
$line > case.log 2>&1 \
&& \ && \
([ -f ../../../sim/tsim/log/taoslog0.0 ] && grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \ ([ -f ../../../sim/tsim/log/taoslog0.0 ] && grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \
([ -f ../../../sim/tsim/log/taoslog0.0 ] && grep -q 'script.*success.*m$' ../../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \ ([ -f ../../../sim/tsim/log/taoslog0.0 ] && grep -q 'script.*success.*m$' ../../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \
( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && cat case.log ) ( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && cat case.log )
else else
echo -n $case #echo -n $case
./test.sh -f $case > ../../sim/case.log 2>&1 && \ echo -n $line
$line > ../../sim/case.log 2>&1 && \
([ -f ../../sim/tsim/log/taoslog0.0 ] && grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \ ([ -f ../../sim/tsim/log/taoslog0.0 ] && grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \
([ -f ../../sim/tsim/log/taoslog0.0 ] && grep -q 'script.*success.*m$' ../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \ ([ -f ../../sim/tsim/log/taoslog0.0 ] && grep -q 'script.*success.*m$' ../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \
( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && pwd && cat ../../sim/case.log ) ( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && pwd && cat ../../sim/case.log )