other: merge 3.0

This commit is contained in:
Cary Xu 2022-06-09 20:08:50 +08:00
commit fff84f1cb4
61 changed files with 991 additions and 389 deletions

View File

@ -2536,6 +2536,7 @@ typedef struct {
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
void tFreeSTableIndexInfo(void *pInfo);
typedef struct {
int8_t mqMsgType;

View File

@ -67,6 +67,7 @@ typedef struct SCatalogReq {
SArray *pUdf; // element is udf name
SArray *pIndex; // element is index name
SArray *pUser; // element is SUserAuthInfo
SArray *pTableIndex; // element is SNAME
bool qNodeRequired; // valid qnode
bool forceUpdate;
} SCatalogReq;
@ -82,6 +83,7 @@ typedef struct SMetaData {
SArray *pDbInfo; // pRes = SDbInfo*
SArray *pTableMeta; // pRes = STableMeta*
SArray *pTableHash; // pRes = SVgroupInfo*
SArray *pTableIndex; // pRes = SArray<STableIndexInfo>*
SArray *pUdfList; // pRes = SFuncInfo*
SArray *pIndex; // pRes = SIndexInfo*
SArray *pUser; // pRes = bool*
@ -277,7 +279,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);

View File

@ -146,6 +146,7 @@ typedef struct SqlFunctionCtx {
struct SDiskbasedBuf *pBuf;
struct SSDataBlock *pSrcBlock;
int32_t curBufPage;
bool increase;
char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx;

View File

@ -129,6 +129,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_SPREAD_MERGE,
FUNCTION_TYPE_HISTOGRAM_PARTIAL,
FUNCTION_TYPE_HISTOGRAM_MERGE,
FUNCTION_TYPE_HYPERLOGLOG_PARTIAL,
FUNCTION_TYPE_HYPERLOGLOG_MERGE,
// user defined funcion
FUNCTION_TYPE_UDF = 10000

View File

@ -304,6 +304,7 @@ typedef struct SDownstreamSourceNode {
typedef struct SExchangePhysiNode {
SPhysiNode node;
int32_t srcGroupId; // group id of datasource suplans
bool singleChannel;
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;

View File

@ -1758,7 +1758,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
@ -1769,9 +1769,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
}
@ -1810,8 +1808,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
@ -1825,7 +1822,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
@ -1951,7 +1948,6 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
blockDataEnsureCapacity(pBlock, numOfRows);
pBlock->info.rows = numOfRows;
const char* pStart = pData;
@ -2025,6 +2021,7 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
pStart += colLen[i];
}
pBlock->info.rows = numOfRows;
ASSERT(pStart - pData == dataLen);
return pStart;
}

View File

@ -2491,6 +2491,16 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
return 0;
}
void tFreeSTableIndexInfo(void* info) {
if (NULL == info) {
return;
}
STableIndexInfo *pInfo = (STableIndexInfo*)info;
taosMemoryFree(pInfo->expr);
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);

View File

@ -186,7 +186,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
if (strcmp(pVgroup->dbName, pStream->targetDb) != 0) {
sdbRelease(pSdb, pVgroup);
continue;
}
@ -286,7 +286,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
bool hasExtraSink = false;
if (totLevel == 2 || strcmp(pStream->sourceDb, pStream->targetDb) != 0) {
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
if (totLevel == 2 || externalTargetDB) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
@ -405,7 +406,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDb);
@ -426,10 +426,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
/*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
/*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
break;
}
}

View File

@ -67,6 +67,7 @@ typedef enum {
CTG_TASK_GET_DB_INFO,
CTG_TASK_GET_TB_META,
CTG_TASK_GET_TB_HASH,
CTG_TASK_GET_TB_INDEX,
CTG_TASK_GET_INDEX,
CTG_TASK_GET_UDF,
CTG_TASK_GET_USER,
@ -93,6 +94,10 @@ typedef struct SCtgTbMetaCtx {
int32_t flag;
} SCtgTbMetaCtx;
typedef struct SCtgTbIndexCtx {
SName* pName;
} SCtgTbIndexCtx;
typedef struct SCtgDbVgCtx {
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbVgCtx;
@ -189,6 +194,7 @@ typedef struct SCtgJob {
int32_t indexNum;
int32_t userNum;
int32_t dbInfoNum;
int32_t tbIndexNum;
} SCtgJob;
typedef struct SCtgMsgCtx {
@ -490,7 +496,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName* name, SArray** out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);

View File

@ -1136,14 +1136,14 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL));
}
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes) {
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == tbFName || NULL == pRes) {
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == pRes) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), tbFName, pRes, NULL));
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), (SName*)pTableName, pRes, NULL));
}

View File

@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS;
}
@ -232,6 +232,35 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_INDEX;
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbIndexCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgTbIndexCtx* ctx = task.taskCtx;
ctx->pName = taosMemoryMalloc(sizeof(*name));
if (NULL == ctx->pName) {
taosMemoryFree(task.taskCtx);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(ctx->pName, name, sizeof(*name));
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS;
}
int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) {
int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum;
if (dbNum > 0) {
@ -329,8 +358,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum;
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum;
if (*taskNum <= 0) {
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
return TSDB_CODE_SUCCESS;
@ -360,6 +390,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
pJob->indexNum = indexNum;
pJob->userNum = userNum;
pJob->dbInfoNum = dbInfoNum;
pJob->tbIndexNum = tbIndexNum;
pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
@ -398,6 +429,11 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
CTG_ERR_JRET(ctgInitGetTbHashTask(pJob, taskIdx++, name));
}
for (int32_t i = 0; i < tbIndexNum; ++i) {
SName* name = taosArrayGet(pReq->pTableIndex, i);
CTG_ERR_JRET(ctgInitGetTbIndexTask(pJob, taskIdx++, name));
}
for (int32_t i = 0; i < indexNum; ++i) {
char* indexName = taosArrayGet(pReq->pIndex, i);
CTG_ERR_JRET(ctgInitGetIndexTask(pJob, taskIdx++, indexName));
@ -479,6 +515,21 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pTableIndex) {
pJob->jobRes.pTableIndex = taosArrayInit(pJob->tbIndexNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pTableIndex) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pTableHash, &res);
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpIndexRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pIndex) {
@ -817,6 +868,20 @@ _return:
CTG_RET(code);
}
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out);
_return:
ctgHandleTaskEnd(pTask, code);
CTG_RET(code);
}
int32_t ctgHandleGetDbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1056,13 +1121,24 @@ _return:
CTG_RET(code);
}
int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), pCtx->pName, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
CTG_ERR_RET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), NULL, pTask));
return TSDB_CODE_SUCCESS;
}
@ -1168,15 +1244,16 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) {
}
SCtgAsyncFps gCtgAsyncFps[] = {
{ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes},
{ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes},
{ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes},
{ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes},
{ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes},
{ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes},
{ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes},
{ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes},
{ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes},
{ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes},
{ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes},
{ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes},
{ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes},
{ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes},
{ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes},
{ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes},
{ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes},
{ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes},
{ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes},
};
int32_t ctgMakeAsyncRes(SCtgJob *pJob) {

View File

@ -427,11 +427,13 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask) {
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName *name, SArray** out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(name, tbFName);
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);

View File

@ -60,6 +60,9 @@ void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pTableHash);
pData->pTableHash = NULL;
taosArrayDestroy(pData->pTableIndex);
pData->pTableIndex = NULL;
taosArrayDestroy(pData->pUdfList);
pData->pUdfList = NULL;
@ -248,6 +251,14 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
taosMemoryFreeClear(pCtx->out);
break;
}
case TDMT_MND_GET_TABLE_INDEX: {
SArray** pOut = (SArray**)pCtx->out;
if (pOut) {
taosArrayDestroyEx(*pOut, tFreeSTableIndexInfo);
taosMemoryFreeClear(pCtx->out);
}
break;
}
case TDMT_MND_RETRIEVE_FUNC: {
SFuncInfo* pOut = (SFuncInfo*)pCtx->out;
taosMemoryFree(pOut->pCode);
@ -344,6 +355,13 @@ void ctgFreeTask(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->res);
break;
}
case CTG_TASK_GET_TB_INDEX: {
SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
taosMemoryFreeClear(pTask->taskCtx);
taosArrayDestroyEx(pTask->res, tFreeSTableIndexInfo);
break;
}
case CTG_TASK_GET_INDEX: {
taosMemoryFreeClear(pTask->taskCtx);
taosMemoryFreeClear(pTask->res);

View File

@ -32,15 +32,17 @@ extern "C" {
#define EXPLAIN_PROJECTION_FORMAT "Projection"
#define EXPLAIN_JOIN_FORMAT "%s"
#define EXPLAIN_AGG_FORMAT "Aggragate"
#define EXPLAIN_INDEF_ROWS_FORMAT "Indefinite Rows Function"
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1"
#define EXPLAIN_SORT_FORMAT "Sort"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s"
#define EXPLAIN_FILL_FORMAT "Fill"
#define EXPLAIN_SESSION_FORMAT "Session"
#define EXPLAIN_STATE_WINDOW_FORMAT "StateWindow on Column %s"
#define EXPLAIN_PARITION_FORMAT "Partition on Column %s"
#define EXPLAIN_ORDER_FORMAT "Order: %s"
#define EXPLAIN_FILTER_FORMAT "Filter: "
#define EXPLAIN_FILL_FORMAT "Fill: %s"
#define EXPLAIN_FILL_VALUE_FORMAT "Fill Values: "
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]"
#define EXPLAIN_OUTPUT_FORMAT "Output: "
@ -66,6 +68,8 @@ extern "C" {
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
#define EXPLAIN_EXECINFO_FORMAT "cost=%.3f..%.3f rows=%" PRIu64
#define EXPLAIN_MODE_FORMAT "mode=%s"
#define EXPLAIN_STRING_TYPE_FORMAT "%s"
typedef struct SExplainGroup {
int32_t nodeNum;

View File

@ -179,6 +179,21 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = mergePhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
SIndefRowsFuncPhysiNode *indefPhysiNode = (SIndefRowsFuncPhysiNode *)pNode;
pPhysiChildren = indefPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: {
SMergeIntervalPhysiNode *intPhysiNode = (SMergeIntervalPhysiNode *)pNode;
pPhysiChildren = intPhysiNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_FILL: {
SFillPhysiNode *fillPhysiNode = (SFillPhysiNode *)pNode;
pPhysiChildren = fillPhysiNode->node.pChildren;
break;
}
default:
qError("not supported physical node type %d", pNode->type);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
@ -212,12 +227,15 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group
SExplainRsp *rsp = NULL;
for (int32_t i = 0; i < group->nodeNum; ++i) {
rsp = taosArrayGet(group->nodeExecInfo, i);
/*
if (group->physiPlanExecIdx >= rsp->numOfPlans) {
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
return TSDB_CODE_QRY_APP_ERROR;
}
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
*/
taosArrayPush(*pExecInfo, rsp->subplanInfo);
}
++group->physiPlanExecIdx;
@ -599,6 +617,42 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
SIndefRowsFuncPhysiNode *pIndefNode = (SIndefRowsFuncPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_INDEF_ROWS_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
if (pIndefNode->pVectorFuncs) {
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIndefNode->pVectorFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIndefNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pIndefNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIndefNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pIndefNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pIndefNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId));
@ -607,7 +661,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, group->nodeNum);
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->singleChannel ? 1 : group->nodeNum);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
@ -750,6 +804,106 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: {
SMergeIntervalPhysiNode *pIntNode = (SMergeIntervalPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->window.pTspk));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pIntNode->window.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
uint8_t precision = getIntervalPrecision(pIntNode);
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT,
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision),
pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision),
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, precision),
pIntNode->slidingUnit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pIntNode->window.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_FILL: {
SFillPhysiNode *pFillNode = (SFillPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_FILL_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pFillNode->mode));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pFillNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pFillNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pFillNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pFillNode->pValues) {
SNodeListNode *pValues = (SNodeListNode*)pFillNode->pValues;
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_VALUE_FORMAT);
SNode* tNode = NULL;
int32_t i = 0;
FOREACH(tNode, pValues->pNodeList) {
if (i) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
SValueNode* tValue = (SValueNode*)tNode;
char *value = nodesGetStrValueFromNode(tValue);
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, value);
taosMemoryFree(value);
++i;
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pFillNode->timeRange.skey,
pFillNode->timeRange.ekey);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pFillNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pFillNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: {
SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT);

View File

@ -804,7 +804,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
@ -892,8 +892,8 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey,
SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,

View File

@ -1130,6 +1130,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pExpr->base.numOfParams;
pCtx->increase = false;
pCtx->param = pFunct->pParam;
// for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
@ -2008,8 +2009,16 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
if (pCtx[j].increase) {
int64_t ts = *(int64_t*) in;
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char *)&ts, pCtx[j].resultInfo->isNullRes);
ts++;
}
} else {
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
}
}
}
@ -4676,7 +4685,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 8;
@ -5339,7 +5349,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size_t size) {
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size) {
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
pSup->pResultRows = taosArrayInit(1024, size);
@ -5358,15 +5369,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size
if (bufSize <= pageSize) {
bufSize = pageSize * 4;
}
return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
}
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
return initStreamAggSupporter(pSup, pKey, sizeof(SResultWindowInfo));
}
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
return initStreamAggSupporter(pSup, pKey, sizeof(SStateWindowInfo));
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
for(int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].pBuf = pSup->pResultBuf;
}
return code;
}
int64_t getSmaWaterMark(int64_t interval, double filesFactor) {

View File

@ -1438,9 +1438,15 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
return needed;
}
void increaseTs(SqlFunctionCtx* pCtx) {
if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTARTTS) {
pCtx[0].increase = true;
}
}
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -1461,6 +1467,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
if (isStream) {
ASSERT(numOfCols > 0);
increaseTs(pInfo->binfo.pCtx);
}
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
@ -2128,6 +2139,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
pResBlock, keyBufSize, pTaskInfo->id.str);
ASSERT(numOfCols > 0);
increaseTs(pInfo->binfo.pCtx);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -2212,6 +2225,8 @@ int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
for (int32_t i = 0; i < numOfCols; ++i) {
pBasicInfo->pCtx[i].pBuf = NULL;
}
ASSERT(numOfCols > 0);
increaseTs(pBasicInfo->pCtx);
return TSDB_CODE_SUCCESS;
}
@ -2228,6 +2243,10 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
}
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo));
}
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
@ -2244,8 +2263,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo");
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo", pInfo->binfo.pCtx, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -3097,6 +3116,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SStateWindowInfo));
}
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) {
SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
@ -3130,8 +3153,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
goto _error;
}
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo");
code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo", pInfo->binfo.pCtx, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -102,6 +102,8 @@ bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t bottomFunction(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
@ -126,7 +128,10 @@ int32_t getHistogramInfoSize();
bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t hllFunction(SqlFunctionCtx* pCtx);
int32_t hllFunctionMerge(SqlFunctionCtx* pCtx);
int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getHLLInfoSize();
bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);

View File

@ -313,6 +313,7 @@ static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int
static int32_t translateApercentilePartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateApercentileImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateApercentileMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateApercentileImpl(pFunc, pErrBuf, len, false);
}
@ -401,6 +402,7 @@ static int32_t translateSpreadImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t
static int32_t translateSpreadPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateSpreadImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateSpreadMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateSpreadImpl(pFunc, pErrBuf, len, false);
}
@ -551,6 +553,7 @@ static int32_t translateHistogramImpl(SFunctionNode* pFunc, char* pErrBuf, int32
static int32_t translateHistogramPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateHistogramImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateHistogramMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateHistogramImpl(pFunc, pErrBuf, len, false);
}
@ -564,6 +567,28 @@ static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateHLLImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
if (isPartial) {
pFunc->node.resType = (SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateHLLPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateHLLImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateHLLMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateHLLImpl(pFunc, pErrBuf, len, false);
}
static bool validateStateOper(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
@ -1020,6 +1045,22 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
*
*/
static bool validateHourRange(int8_t hour) {
if (hour < 0 || hour > 12) {
return false;
}
return true;
}
static bool validateMinuteRange(int8_t hour, int8_t minute, char sign) {
if (minute == 0 || (minute == 30 && (hour == 3 || hour == 5) && sign == '-')) {
return true;
}
return false;
}
static bool validateTimezoneFormat(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
@ -1028,6 +1069,8 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
char* tz = varDataVal(pVal->datum.p);
int32_t len = varDataLen(pVal->datum.p);
char buf[3] = {0};
int8_t hour = -1, minute = -1;
if (len == 0) {
return false;
} else if (len == 1 && (tz[0] == 'z' || tz[0] == 'Z')) {
@ -1040,6 +1083,20 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 4) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
@ -1051,9 +1108,24 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
}
continue;
}
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 5) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
@ -1339,6 +1411,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = topFunction,
.finalizeFunc = topBotFinalize,
.combineFunc = topCombine,
},
{
.name = "bottom",
@ -1348,7 +1421,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup,
.processFunc = bottomFunction,
.finalizeFunc = topBotFinalize
.finalizeFunc = topBotFinalize,
.combineFunc = bottomCombine,
},
{
.name = "spread",
@ -1478,6 +1552,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup,
.processFunc = hllFunction,
.finalizeFunc = hllFinalize,
.pPartialFunc = "_hyperloglog_partial",
.pMergeFunc = "_hyperloglog_merge"
},
{
.name = "_hyperloglog_partial",
.type = FUNCTION_TYPE_HYPERLOGLOG_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateHLLPartial,
.getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup,
.processFunc = hllFunction,
.finalizeFunc = hllPartialFinalize
},
{
.name = "_hyperloglog_merge",
.type = FUNCTION_TYPE_HYPERLOGLOG_MERGE,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateHLLMerge,
.getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup,
.processFunc = hllFunctionMerge,
.finalizeFunc = hllFinalize
},
{

View File

@ -1389,6 +1389,18 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
}
}
void releaseSource(STuplePos* pPos) {
if (pPos->pageId == -1) {
return ;
}
// Todo(liuyao) relase row
}
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
releaseSource(pDestPos);
*pDestPos = *pSourcePos;
}
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
@ -1400,10 +1412,12 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
if (pSBuf->assign &&
( (((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign ) ) {
*(double*) &pDBuf->v = *(double*) &pSBuf->v;
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
}
} else {
if ( pSBuf->assign && ( ((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign ) ) {
pDBuf->v = pSBuf->v;
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
}
}
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
@ -2856,7 +2870,6 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
pEntryInfo->complete = true;
int32_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
@ -2881,6 +2894,67 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pEntryInfo->numOfRes;
}
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
bool isTopQuery) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
STopBotResItem* pItems = pRes->pItems;
assert(pItems != NULL);
// not full yet
if (pEntryInfo->numOfRes < maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = pSourceItem->v;
pItem->uid = pSourceItem->uid;
pItem->tuplePos.pageId = -1;
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
pEntryInfo->numOfRes++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
!isTopQuery);
} else { // replace the minimum value in the result
if ((isTopQuery && (
(IS_SIGNED_NUMERIC_TYPE(type) && pSourceItem->v.i > pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && pSourceItem->v.u > pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && pSourceItem->v.d > pItems[0].v.d)))
|| (!isTopQuery && (
(IS_SIGNED_NUMERIC_TYPE(type) && pSourceItem->v.i < pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && pSourceItem->v.u < pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && pSourceItem->v.d < pItems[0].v.d))
)) {
// replace the old data and the coresponding tuple data
STopBotResItem* pItem = &pItems[0];
pItem->v = pSourceItem->v;
pItem->uid = pSourceItem->uid;
// save the data of this tuple by over writing the old data
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
topBotResComparFn, NULL, !isTopQuery);
}
}
}
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
addResult(pDestCtx, pSBuf->pItems + i, type, true);
}
return TSDB_CODE_SUCCESS;
}
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
addResult(pDestCtx, pSBuf->pItems + i, type, false);
}
return TSDB_CODE_SUCCESS;
}
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSpreadInfo);
return true;
@ -3411,7 +3485,6 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
}
int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getHistogramInfoSize();
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
@ -3428,6 +3501,10 @@ int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return 1;
}
int32_t getHLLInfoSize() {
return (int32_t)sizeof(SHLLInfo);
}
bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SHLLInfo);
return true;
@ -3553,6 +3630,27 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t hllFunctionMerge(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
SHLLInfo* pInputInfo = (SHLLInfo *)varDataVal(data);
for (int32_t k = 0; k < HLL_BUCKETS; ++k) {
if (pInfo->buckets[k] < pInputInfo->buckets[k]) {
pInfo->buckets[k] = pInputInfo->buckets[k];
}
}
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo *pInfo = GET_RES_INFO(pCtx);
@ -3565,6 +3663,24 @@ int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return functionFinalize(pCtx, pBlock);
}
int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getHLLInfoSize();
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false);
taosMemoryFree(res);
return pResInfo->numOfRes;
}
bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SStateInfo);
return true;

View File

@ -1186,6 +1186,7 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pMerge->srcGroupId;
pExchange->singleChannel = true;
pExchange->node.pParent = (SPhysiNode*)pMerge;
pExchange->node.pOutputDataBlockDesc = nodesCloneNode(pMerge->node.pOutputDataBlockDesc);
if (NULL == pExchange->node.pOutputDataBlockDesc) {

View File

@ -348,7 +348,7 @@ static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIn
int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
char* t = taosMemoryCalloc(1, outputMaxLen);
/*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), outputMaxLen, &len);
/*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), outputMaxLen - VARSTR_HEADER_SIZE, &len);
varDataSetLen(t, len);
colDataAppend(pOut->columnData, rowIndex, t, false);

View File

@ -253,7 +253,9 @@ int walRoll(SWal *pWal) {
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset};
int size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
/*int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_CUR);*/
/*wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);*/
int size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
// TODO truncate

View File

@ -174,9 +174,9 @@ int32_t taosRenameFile(const char *oldName, const char *newName) {
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
struct stat fileStat;
#ifdef WINDOWS
int32_t code = _stat(path, &fileStat);
int32_t code = _stat(path, &fileStat);
#else
int32_t code = stat(path, &fileStat);
int32_t code = stat(path, &fileStat);
#endif
if (code < 0) {
return code;
@ -201,7 +201,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
#ifdef WINDOWS
BY_HANDLE_FILE_INFORMATION bhfi;
HANDLE handle = (HANDLE)_get_osfhandle(pFile->fd);
HANDLE handle = (HANDLE)_get_osfhandle(pFile->fd);
if (GetFileInformationByHandle(handle, &bhfi) == FALSE) {
printf("taosFStatFile get file info fail.");
return -1;
@ -316,12 +316,12 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
(*ppFile)->fp = NULL;
}
if ((*ppFile)->fd >= 0) {
#ifdef WINDOWS
#ifdef WINDOWS
HANDLE h = (HANDLE)_get_osfhandle((*ppFile)->fd);
!FlushFileBuffers(h);
#else
#else
fsync((*ppFile)->fd);
#endif
#endif
close((*ppFile)->fd);
(*ppFile)->fd = -1;
}
@ -345,11 +345,11 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
char *tbuf = (char *)buf;
while (leftbytes > 0) {
#ifdef WINDOWS
#ifdef WINDOWS
readbytes = _read(pFile->fd, (void *)tbuf, (uint32_t)leftbytes);
#else
#else
readbytes = read(pFile->fd, (void *)tbuf, (uint32_t)leftbytes);
#endif
#endif
if (readbytes < 0) {
if (errno == EINTR) {
continue;
@ -433,9 +433,6 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
}
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
@ -459,9 +456,9 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
struct stat fileStat;
#ifdef WINDOWS
int32_t code = _fstat(pFile->fd, &fileStat);
int32_t code = _fstat(pFile->fd, &fileStat);
#else
int32_t code = fstat(pFile->fd, &fileStat);
int32_t code = fstat(pFile->fd, &fileStat);
#endif
if (code < 0) {
return code;
@ -565,12 +562,12 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
if (pFile->fp != NULL) return fflush(pFile->fp);
if (pFile->fd >= 0) {
#ifdef WINDOWS
#ifdef WINDOWS
HANDLE h = (HANDLE)_get_osfhandle(pFile->fd);
return !FlushFileBuffers(h);
#else
#else
return fsync(pFile->fd);
#endif
#endif
}
return 0;
}

View File

@ -28,92 +28,48 @@
#ifdef WINDOWS
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
//#define TM_YEAR_BASE 1970 //origin
#define TM_YEAR_BASE 1900 //slguan
#define TM_YEAR_BASE 1900 // slguan
/*
* We do not implement alternate representations. However, we always
* check whether a given modifier is allowed for a certain conversion.
*/
#define ALT_E 0x01
#define ALT_O 0x02
#define LEGAL_ALT(x) { if (alt_format & ~(x)) return (0); }
* We do not implement alternate representations. However, we always
* check whether a given modifier is allowed for a certain conversion.
*/
#define ALT_E 0x01
#define ALT_O 0x02
#define LEGAL_ALT(x) \
{ \
if (alt_format & ~(x)) return (0); \
}
static int conv_num(const char **buf, int *dest, int llim, int ulim) {
int result = 0;
static int conv_num(const char **buf, int *dest, int llim, int ulim)
{
int result = 0;
/* The limit also determines the number of valid digits. */
int rulim = ulim;
/* The limit also determines the number of valid digits. */
int rulim = ulim;
if (**buf < '0' || **buf > '9') return (0);
if (**buf < '0' || **buf > '9')
return (0);
do {
result *= 10;
result += *(*buf)++ - '0';
rulim /= 10;
} while ((result * 10 <= ulim) && rulim && **buf >= '0' && **buf <= '9');
do {
result *= 10;
result += *(*buf)++ - '0';
rulim /= 10;
} while ((result * 10 <= ulim) && rulim && **buf >= '0' && **buf <= '9');
if (result < llim || result > ulim) return (0);
if (result < llim || result > ulim)
return (0);
*dest = result;
return (1);
*dest = result;
return (1);
}
static const char *day[7] = {
"Sunday", "Monday", "Tuesday", "Wednesday", "Thursday",
"Friday", "Saturday"
};
static const char *abday[7] = {
"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"
};
static const char *mon[12] = {
"January", "February", "March", "April", "May", "June", "July",
"August", "September", "October", "November", "December"
};
static const char *abmon[12] = {
"Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"
};
static const char *am_pm[2] = {
"AM", "PM"
};
#define BILLION (1E9)
static BOOL g_first_time = 1;
static LARGE_INTEGER g_counts_per_sec;
int clock_gettime(int dummy, struct timespec *ct)
{
LARGE_INTEGER count;
if (g_first_time)
{
g_first_time = 0;
if (0 == QueryPerformanceFrequency(&g_counts_per_sec))
{
g_counts_per_sec.QuadPart = 0;
}
}
if ((NULL == ct) || (g_counts_per_sec.QuadPart <= 0) ||
(0 == QueryPerformanceCounter(&count)))
{
return -1;
}
ct->tv_sec = count.QuadPart / g_counts_per_sec.QuadPart;
ct->tv_nsec = ((count.QuadPart % g_counts_per_sec.QuadPart) * BILLION) / g_counts_per_sec.QuadPart;
return 0;
}
static const char *day[7] = {"Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"};
static const char *abday[7] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"};
static const char *mon[12] = {"January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December"};
static const char *abmon[12] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
static const char *am_pm[2] = {"AM", "PM"};
#else
#include <sys/time.h>
@ -121,301 +77,265 @@ int clock_gettime(int dummy, struct timespec *ct)
char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm) {
#ifdef WINDOWS
char c;
const char *bp;
size_t len = 0;
int alt_format, i, split_year = 0;
char c;
const char *bp;
size_t len = 0;
int alt_format, i, split_year = 0;
bp = buf;
bp = buf;
while ((c = *fmt) != '\0') {
/* Clear `alternate' modifier prior to new conversion. */
alt_format = 0;
while ((c = *fmt) != '\0') {
/* Clear `alternate' modifier prior to new conversion. */
alt_format = 0;
/* Eat up white-space. */
if (isspace(c)) {
while (isspace(*bp))
bp++;
/* Eat up white-space. */
if (isspace(c)) {
while (isspace(*bp)) bp++;
fmt++;
continue;
}
fmt++;
continue;
}
if ((c = *fmt++) != '%')
goto literal;
if ((c = *fmt++) != '%') goto literal;
again: switch (c = *fmt++) {
case '%': /* "%%" is converted to "%". */
literal :
if (c != *bp++)
return (0);
again:
switch (c = *fmt++) {
case '%': /* "%%" is converted to "%". */
literal:
if (c != *bp++) return (0);
break;
/*
* "Alternative" modifiers. Just set the appropriate flag
* and start over again.
*/
case 'E': /* "%E?" alternative conversion modifier. */
* "Alternative" modifiers. Just set the appropriate flag
* and start over again.
*/
case 'E': /* "%E?" alternative conversion modifier. */
LEGAL_ALT(0);
alt_format |= ALT_E;
goto again;
case 'O': /* "%O?" alternative conversion modifier. */
case 'O': /* "%O?" alternative conversion modifier. */
LEGAL_ALT(0);
alt_format |= ALT_O;
goto again;
/*
* "Complex" conversion rules, implemented through recursion.
*/
case 'c': /* Date and time, using the locale's format. */
* "Complex" conversion rules, implemented through recursion.
*/
case 'c': /* Date and time, using the locale's format. */
LEGAL_ALT(ALT_E);
if (!(bp = taosStrpTime(bp, "%x %X", tm)))
return (0);
if (!(bp = taosStrpTime(bp, "%x %X", tm))) return (0);
break;
case 'D': /* The date as "%m/%d/%y". */
case 'D': /* The date as "%m/%d/%y". */
LEGAL_ALT(0);
if (!(bp = taosStrpTime(bp, "%m/%d/%y", tm)))
return (0);
if (!(bp = taosStrpTime(bp, "%m/%d/%y", tm))) return (0);
break;
case 'R': /* The time as "%H:%M". */
case 'R': /* The time as "%H:%M". */
LEGAL_ALT(0);
if (!(bp = taosStrpTime(bp, "%H:%M", tm)))
return (0);
if (!(bp = taosStrpTime(bp, "%H:%M", tm))) return (0);
break;
case 'r': /* The time in 12-hour clock representation. */
case 'r': /* The time in 12-hour clock representation. */
LEGAL_ALT(0);
if (!(bp = taosStrpTime(bp, "%I:%M:%S %p", tm)))
return (0);
if (!(bp = taosStrpTime(bp, "%I:%M:%S %p", tm))) return (0);
break;
case 'T': /* The time as "%H:%M:%S". */
case 'T': /* The time as "%H:%M:%S". */
LEGAL_ALT(0);
if (!(bp = taosStrpTime(bp, "%H:%M:%S", tm)))
return (0);
if (!(bp = taosStrpTime(bp, "%H:%M:%S", tm))) return (0);
break;
case 'X': /* The time, using the locale's format. */
case 'X': /* The time, using the locale's format. */
LEGAL_ALT(ALT_E);
if (!(bp = taosStrpTime(bp, "%H:%M:%S", tm)))
return (0);
if (!(bp = taosStrpTime(bp, "%H:%M:%S", tm))) return (0);
break;
case 'x': /* The date, using the locale's format. */
case 'x': /* The date, using the locale's format. */
LEGAL_ALT(ALT_E);
if (!(bp = taosStrpTime(bp, "%m/%d/%y", tm)))
return (0);
if (!(bp = taosStrpTime(bp, "%m/%d/%y", tm))) return (0);
break;
/*
* "Elementary" conversion rules.
*/
case 'A': /* The day of week, using the locale's form. */
case 'a':
* "Elementary" conversion rules.
*/
case 'A': /* The day of week, using the locale's form. */
case 'a':
LEGAL_ALT(0);
for (i = 0; i < 7; i++) {
/* Full name. */
len = strlen(day[i]);
if (strncmp(day[i], bp, len) == 0)
break;
/* Full name. */
len = strlen(day[i]);
if (strncmp(day[i], bp, len) == 0) break;
/* Abbreviated name. */
len = strlen(abday[i]);
if (strncmp(abday[i], bp, len) == 0)
break;
/* Abbreviated name. */
len = strlen(abday[i]);
if (strncmp(abday[i], bp, len) == 0) break;
}
/* Nothing matched. */
if (i == 7)
return (0);
if (i == 7) return (0);
tm->tm_wday = i;
bp += len;
break;
case 'B': /* The month, using the locale's form. */
case 'b':
case 'h':
case 'B': /* The month, using the locale's form. */
case 'b':
case 'h':
LEGAL_ALT(0);
for (i = 0; i < 12; i++) {
/* Full name. */
len = strlen(mon[i]);
if (strncmp(mon[i], bp, len) == 0)
break;
/* Full name. */
len = strlen(mon[i]);
if (strncmp(mon[i], bp, len) == 0) break;
/* Abbreviated name. */
len = strlen(abmon[i]);
if (strncmp(abmon[i], bp, len) == 0)
break;
/* Abbreviated name. */
len = strlen(abmon[i]);
if (strncmp(abmon[i], bp, len) == 0) break;
}
/* Nothing matched. */
if (i == 12)
return (0);
if (i == 12) return (0);
tm->tm_mon = i;
bp += len;
break;
case 'C': /* The century number. */
case 'C': /* The century number. */
LEGAL_ALT(ALT_E);
if (!(conv_num(&bp, &i, 0, 99)))
return (0);
if (!(conv_num(&bp, &i, 0, 99))) return (0);
if (split_year) {
tm->tm_year = (tm->tm_year % 100) + (i * 100);
}
else {
tm->tm_year = i * 100;
split_year = 1;
tm->tm_year = (tm->tm_year % 100) + (i * 100);
} else {
tm->tm_year = i * 100;
split_year = 1;
}
break;
case 'd': /* The day of month. */
case 'e':
case 'd': /* The day of month. */
case 'e':
LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_mday, 1, 31)))
return (0);
if (!(conv_num(&bp, &tm->tm_mday, 1, 31))) return (0);
break;
case 'k': /* The hour (24-hour clock representation). */
case 'k': /* The hour (24-hour clock representation). */
LEGAL_ALT(0);
/* FALLTHROUGH */
case 'H':
case 'H':
LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_hour, 0, 23)))
return (0);
if (!(conv_num(&bp, &tm->tm_hour, 0, 23))) return (0);
break;
case 'l': /* The hour (12-hour clock representation). */
case 'l': /* The hour (12-hour clock representation). */
LEGAL_ALT(0);
/* FALLTHROUGH */
case 'I':
case 'I':
LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_hour, 1, 12)))
return (0);
if (tm->tm_hour == 12)
tm->tm_hour = 0;
if (!(conv_num(&bp, &tm->tm_hour, 1, 12))) return (0);
if (tm->tm_hour == 12) tm->tm_hour = 0;
break;
case 'j': /* The day of year. */
case 'j': /* The day of year. */
LEGAL_ALT(0);
if (!(conv_num(&bp, &i, 1, 366)))
return (0);
if (!(conv_num(&bp, &i, 1, 366))) return (0);
tm->tm_yday = i - 1;
break;
case 'M': /* The minute. */
case 'M': /* The minute. */
LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_min, 0, 59)))
return (0);
if (!(conv_num(&bp, &tm->tm_min, 0, 59))) return (0);
break;
case 'm': /* The month. */
case 'm': /* The month. */
LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &i, 1, 12)))
return (0);
if (!(conv_num(&bp, &i, 1, 12))) return (0);
tm->tm_mon = i - 1;
break;
case 'p': /* The locale's equivalent of AM/PM. */
case 'p': /* The locale's equivalent of AM/PM. */
LEGAL_ALT(0);
/* AM? */
if (strcmp(am_pm[0], bp) == 0) {
if (tm->tm_hour > 11)
return (0);
if (tm->tm_hour > 11) return (0);
bp += strlen(am_pm[0]);
break;
bp += strlen(am_pm[0]);
break;
}
/* PM? */
else if (strcmp(am_pm[1], bp) == 0) {
if (tm->tm_hour > 11)
return (0);
if (tm->tm_hour > 11) return (0);
tm->tm_hour += 12;
bp += strlen(am_pm[1]);
break;
tm->tm_hour += 12;
bp += strlen(am_pm[1]);
break;
}
/* Nothing matched. */
return (0);
case 'S': /* The seconds. */
case 'S': /* The seconds. */
LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_sec, 0, 61)))
return (0);
if (!(conv_num(&bp, &tm->tm_sec, 0, 61))) return (0);
break;
case 'U': /* The week of year, beginning on sunday. */
case 'W': /* The week of year, beginning on monday. */
case 'U': /* The week of year, beginning on sunday. */
case 'W': /* The week of year, beginning on monday. */
LEGAL_ALT(ALT_O);
/*
* XXX This is bogus, as we can not assume any valid
* information present in the tm structure at this
* point to calculate a real value, so just check the
* range for now.
*/
if (!(conv_num(&bp, &i, 0, 53)))
return (0);
* XXX This is bogus, as we can not assume any valid
* information present in the tm structure at this
* point to calculate a real value, so just check the
* range for now.
*/
if (!(conv_num(&bp, &i, 0, 53))) return (0);
break;
case 'w': /* The day of week, beginning on sunday. */
case 'w': /* The day of week, beginning on sunday. */
LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_wday, 0, 6)))
return (0);
if (!(conv_num(&bp, &tm->tm_wday, 0, 6))) return (0);
break;
case 'Y': /* The year. */
case 'Y': /* The year. */
LEGAL_ALT(ALT_E);
if (!(conv_num(&bp, &i, 0, 9999)))
return (0);
if (!(conv_num(&bp, &i, 0, 9999))) return (0);
tm->tm_year = i - TM_YEAR_BASE;
break;
case 'y': /* The year within 100 years of the epoch. */
case 'y': /* The year within 100 years of the epoch. */
LEGAL_ALT(ALT_E | ALT_O);
if (!(conv_num(&bp, &i, 0, 99)))
return (0);
if (!(conv_num(&bp, &i, 0, 99))) return (0);
if (split_year) {
tm->tm_year = ((tm->tm_year / 100) * 100) + i;
break;
tm->tm_year = ((tm->tm_year / 100) * 100) + i;
break;
}
split_year = 1;
if (i <= 68)
tm->tm_year = i + 2000 - TM_YEAR_BASE;
tm->tm_year = i + 2000 - TM_YEAR_BASE;
else
tm->tm_year = i + 1900 - TM_YEAR_BASE;
tm->tm_year = i + 1900 - TM_YEAR_BASE;
break;
/*
* Miscellaneous conversions.
*/
case 'n': /* Any kind of white-space. */
case 't':
* Miscellaneous conversions.
*/
case 'n': /* Any kind of white-space. */
case 't':
LEGAL_ALT(0);
while (isspace(*bp))
bp++;
while (isspace(*bp)) bp++;
break;
default: /* Unknown/unsupported conversion. */
default: /* Unknown/unsupported conversion. */
return (0);
}
}
}
/* LINTED functional specification */
return ((char *)bp);
/* LINTED functional specification */
return ((char *)bp);
#else
return strptime(buf, fmt, tm);
return strptime(buf, fmt, tm);
#endif
}
@ -435,13 +355,9 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
#endif
}
time_t taosTime(time_t *t) {
return time(t);
}
time_t taosTime(time_t *t) { return time(t); }
time_t taosMktime(struct tm *timep) {
return mktime(timep);
}
time_t taosMktime(struct tm *timep) { return mktime(timep); }
struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
if (result == NULL) {
@ -456,5 +372,36 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
}
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
int32_t taosClockGetTime(int clock_id, struct timespec *pTS) {
#ifdef WINDOWS
LARGE_INTEGER t;
FILETIME f;
static FILETIME ff;
static SYSTEMTIME ss;
static LARGE_INTEGER offset;
int32_t taosClockGetTime(int clock_id, struct timespec *pTS) { return clock_gettime(clock_id, pTS); }
ss.wYear = 1970;
ss.wMonth = 1;
ss.wDay = 1;
ss.wHour = 0;
ss.wMinute = 0;
ss.wSecond = 0;
ss.wMilliseconds = 0;
SystemTimeToFileTime(&ss, &ff);
offset.QuadPart = ff.dwHighDateTime;
offset.QuadPart <<= 32;
offset.QuadPart |= ff.dwLowDateTime;
GetSystemTimeAsFileTime(&f);
t.QuadPart = f.dwHighDateTime;
t.QuadPart <<= 32;
t.QuadPart |= f.dwLowDateTime;
t.QuadPart -= offset.QuadPart;
pTS->tv_sec = t.QuadPart / 10000000;
pTS->tv_nsec = (t.QuadPart % 10000000)*100;
return (0);
#else
return clock_gettime(clock_id, pTS);
#endif
}

View File

@ -321,6 +321,7 @@ class TDDnode:
self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.start(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
self.running = 1
else:
os.system("rm -rf %s/taosdlog.0"%self.logDir)
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
@ -338,8 +339,6 @@ class TDDnode:
if i > 50:
break
tailCmdStr = 'tail -f '
if platform.system().lower() == 'windows':
tailCmdStr = 'tail -n +0 -f '
popen = subprocess.Popen(
tailCmdStr + logFile,
stdout=subprocess.PIPE,
@ -574,6 +573,9 @@ class TDDnodes:
def stopAll(self):
tdLog.info("stop all dnodes")
if (not self.dnodes[0].remoteIP == ""):
self.dnodes[0].remoteExec(self.dnodes[0].cfgDict, "for i in range(len(tdDnodes.dnodes)):\n tdDnodes.dnodes[i].running=1\ntdDnodes.stopAll()")
return
for i in range(len(self.dnodes)):
self.dnodes[i].stop()

View File

@ -17,7 +17,7 @@ sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
sql create stream streams2 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s);
sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL,1);
sql insert into t1 values(1648791223001,10,2,3,1.1,2);
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
@ -176,4 +176,107 @@ if $data08 != 13 then
return -1
endi
sql create database test2 vgroups 1;
sql use test2;
sql create table t2(ts timestamp, a int, b int , c int, d double, id int);
sql create stream streams2 trigger at_once watermark 1d into streamt2 as select _wstartts,apercentile(a,30) c1, apercentile(a,70), apercentile(a,20,"t-digest") c2, apercentile(a,60,"t-digest") c3, max(id) c4 from t2 session(ts,10s);
sql insert into t2 values(1648791213001,1,1,3,1.0,1);
sql insert into t2 values(1648791213002,2,2,6,3.4,2);
sql insert into t2 values(1648791213003,4,9,3,4.8,3);
sql insert into t2 values(1648791233003,3,4,3,2.1,4);
sql insert into t2 values(1648791233004,3,5,3,3.4,5);
sql insert into t2 values(1648791233005,3,6,3,7.6,6);
#
sql insert into t2 values(1648791223003,20,7,3,10.1,7);
$loop_count = 0
loop2:
sleep 300
sql select * from streamt2 where c4=7;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print ======$rows
goto loop2
endi
# row 0
if $data01 != 2.091607978 then
print =====data01=$data01
goto loop2
endi
if $data02 != 3.274823935 then
print =====data02=$data02
goto loop2
endi
if $data03 != 1.800000000 then
print ======$data03
return -1
endi
if $data04 != 3.350000000 then
print ======$data04
return -1
endi
sql create database test3 vgroups 1;
sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s);
sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s);
sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, max(b), a,c from t1 session(ts,10s);
sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, max(b), a,c from t1 session(ts,10s);
sql insert into t1 values(1648791213001,1,1,1,1.0);
sql insert into t1 values(1648791213002,2,3,2,3.4);
sql insert into t1 values(1648791213003,4,9,3,4.8);
sql insert into t1 values(1648791213004,4,5,4,4.8);
sql insert into t1 values(1648791233004,3,4,0,2.1);
sql insert into t1 values(1648791233005,3,0,6,3.4);
sql insert into t1 values(1648791233006,3,6,7,7.6);
sql insert into t1 values(1648791233007,3,13,8,7.6);
sql insert into t1 values(1648791223004,20,7,9,10.1);
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt3;
if $rows == 0 then
print ======$rows
goto loop3
endi
sql select * from streamt4;
if $rows == 0 then
print ======$rows
goto loop3
endi
sql select * from streamt5;
if $rows == 0 then
print ======$rows
goto loop3
endi
sql select * from streamt6;
if $rows == 0 then
print ======$rows
goto loop3
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -23,7 +23,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -37,7 +37,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -9,8 +9,9 @@ from util.sql import *
from util.cases import *
from util.dnodes import *
import subprocess
# import win32gui
# import threading
if (platform.system().lower() == 'windows'):
import win32gui
import threading
class TDTestCase:
@ -535,17 +536,18 @@ class TDTestCase:
return udf1_sqls ,udf2_sqls
# def checkRunTimeError(self):
# while 1:
# time.sleep(1)
# hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library")
# if hwnd:
# os.system("TASKKILL /F /IM udfd.exe")
def checkRunTimeError(self):
if (platform.system().lower() == 'windows' and tdDnodes.dnodes[0].remoteIP == ""):
while 1:
time.sleep(1)
hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library")
if hwnd:
os.system("TASKKILL /F /IM udfd.exe")
def unexpected_create(self):
# if (platform.system().lower() == 'windows' and tdDnodes.dnodes[0].remoteIP == ""):
# checkErrorThread = threading.Thread(target=self.checkRunTimeError,daemon=True)
# checkErrorThread.start()
if (platform.system().lower() == 'windows' and tdDnodes.dnodes[0].remoteIP == ""):
checkErrorThread = threading.Thread(target=self.checkRunTimeError,daemon=True)
checkErrorThread.start()
tdLog.info(" create function with out bufsize ")
tdSql.query("drop function udf1 ")

View File

@ -3,6 +3,7 @@ import taos
import time
import inspect
import traceback
import socket
from dataclasses import dataclass
from util.log import *
@ -102,7 +103,7 @@ class TDconnect:
def taos_connect(
host = "127.0.0.1",
host = socket.gethostname(),
port = 6030,
user = "root",
passwd = "taosdata",

View File

@ -54,7 +54,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]

View File

@ -52,7 +52,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]

View File

@ -49,7 +49,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]

View File

@ -49,7 +49,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]

View File

@ -49,7 +49,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]

View File

@ -400,7 +400,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -3,7 +3,10 @@ from util.log import *
from util.sql import *
from util.cases import *
import platform
import os
if platform.system().lower() == 'windows':
import tzlocal
class TDTestCase:
@ -15,16 +18,20 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method
tdSql.prepare()
# get system timezone
time_zone_arr = os.popen('timedatectl | grep zone').read(
).strip().split(':')
if len(time_zone_arr) > 1:
time_zone = time_zone_arr[1].lstrip()
else:
# possibly in a docker container
time_zone_1 = os.popen('ls -l /etc/localtime|awk -F/ \'{print $(NF-1) "/" $NF}\'').read().strip()
time_zone_2 = os.popen('date "+(%Z, %z)"').read().strip()
if platform.system().lower() == 'windows':
time_zone_1 = tzlocal.get_localzone_name()
time_zone_2 = time.strftime('(UTC, %z)')
time_zone = time_zone_1 + " " + time_zone_2
print("expected time zone: " + time_zone)
else:
time_zone_arr = os.popen('timedatectl | grep zone').read().strip().split(':')
if len(time_zone_arr) > 1:
time_zone = time_zone_arr[1].lstrip()
else:
# possibly in a docker container
time_zone_1 = os.popen('ls -l /etc/localtime|awk -F/ \'{print $(NF-1) "/" $NF}\'').read().strip()
time_zone_2 = os.popen('date "+(%Z, %z)"').read().strip()
time_zone = time_zone_1 + " " + time_zone_2
print("expected time zone: " + time_zone)
tdLog.printNoPrefix("==========step1:create tables==========")
tdSql.execute(

View File

@ -13,6 +13,12 @@ from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
try:
config = eval(tdDnodes.dnodes[0].remoteIP)
hostname = config["host"]
except Exception:
hostname = tdDnodes.dnodes[0].remoteIP
#rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
@ -34,7 +40,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
@ -192,7 +198,10 @@ class TDTestCase:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
if (platform.system().lower() == 'windows'):
shellCmd += "> nul 2>&1 &"
else:
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
@ -306,7 +315,10 @@ class TDTestCase:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
if (platform.system().lower() == 'windows'):
shellCmd += "> nul 2>&1 &"
else:
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
@ -438,7 +450,10 @@ class TDTestCase:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
if (platform.system().lower() == 'windows'):
shellCmd += "> nul 2>&1 &"
else:
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -33,7 +33,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]

View File

@ -1,12 +1,12 @@
python3 .\test.py -f 0-others\taosShell.py
python3 .\test.py -f 0-others\taosShellError.py
@REM python3 .\test.py -f 0-others\taosShell.py
@REM python3 .\test.py -f 0-others\taosShellError.py
python3 .\test.py -f 0-others\taosShellNetChk.py
python3 .\test.py -f 0-others\telemetry.py
python3 .\test.py -f 0-others\taosdMonitor.py
python3 .\test.py -f 0-others\udfTest.py
python3 .\test.py -f 0-others\udf_create.py
python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\cachelast.py
@REM python3 .\test.py -f 0-others\user_control.py

View File

@ -2,14 +2,7 @@
SETLOCAL EnableDelayedExpansion
for /F "tokens=1,2 delims=#" %%a in ('"prompt #$H#$E# & echo on & for %%b in (1) do rem"') do ( set "DEL=%%a")
set /a a=0
@REM echo Windows Taosd Test
@REM for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
@REM echo Processing %%i
@REM set /a a+=1
@REM call %%i ARG1 > result_!a!.txt 2>error_!a!.txt
@REM if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && exit 8 ) else ( call :colorEcho 0a "Success" &echo. )
@REM )
echo Linux Taosd Test
echo Windows Taosd Test
for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
echo Processing %%i
@ -17,10 +10,22 @@ for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
set time1=!_timeTemp!
echo Start at %time%
set /a a+=1
call %%i ARG1 -m %1 > result_!a!.txt 2>error_!a!.txt
call %%i ARG1 > result_!a!.txt 2>error_!a!.txt
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. )
)
)
@REM echo Linux Taosd Test
@REM for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
@REM for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
@REM echo Processing %%i
@REM call :GetTimeSeconds %time%
@REM set time1=!_timeTemp!
@REM echo Start at %time%
@REM set /a a+=1
@REM call %%i ARG1 -m %1 > result_!a!.txt 2>error_!a!.txt
@REM if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. )
@REM )
@REM )
exit
:colorEcho

View File

@ -20,6 +20,7 @@ import time
import base64
import json
import platform
import socket
from distutils.log import warn as printf
from fabric2 import Connection
sys.path.append("../pytest")
@ -149,7 +150,7 @@ if __name__ == "__main__":
tdLog.info('stop All dnodes')
if masterIp == "":
host = '127.0.0.1'
host = socket.gethostname()
else:
try:
config = eval(masterIp)
@ -170,8 +171,8 @@ if __name__ == "__main__":
try:
if key_word in open(fileName, encoding='UTF-8').read():
is_test_framework = 1
except:
pass
except Exception as r:
print(r)
updateCfgDictStr = ''
if is_test_framework:
moduleName = fileName.replace(".py", "").replace(os.sep, ".")
@ -181,8 +182,8 @@ if __name__ == "__main__":
if ((json.dumps(updateCfgDict) == '{}') and (ucase.updatecfgDict is not None)):
updateCfgDict = ucase.updatecfgDict
updateCfgDictStr = "-d %s"%base64.b64encode(json.dumps(updateCfgDict).encode()).decode()
except :
pass
except Exception as r:
print(r)
else:
pass
tdDnodes.deploy(1,updateCfgDict)