other: merge 3.0

This commit is contained in:
Haojun Liao 2024-08-02 09:16:03 +08:00
commit 27cadbbcb6
14 changed files with 147 additions and 53 deletions

View File

@ -80,8 +80,8 @@ if [ -f %{_compiledir}/../../../explorer/target/taos-explorer.service ]; then
cp %{_compiledir}/../../../explorer/target/taos-explorer.service %{buildroot}%{homepath}/cfg ||: cp %{_compiledir}/../../../explorer/target/taos-explorer.service %{buildroot}%{homepath}/cfg ||:
fi fi
if [ -f %{_compiledir}/../../../explorer/server/example/explorer.toml ]; then if [ -f %{_compiledir}/../../../explorer/server/examples/explorer.toml ]; then
cp %{_compiledir}/../../../explorer/server/example/explorer.toml %{buildroot}%{homepath}/cfg ||: cp %{_compiledir}/../../../explorer/server/examples/explorer.toml %{buildroot}%{homepath}/cfg ||:
fi fi
#cp %{_compiledir}/../packaging/rpm/taosd %{buildroot}%{homepath}/init.d #cp %{_compiledir}/../packaging/rpm/taosd %{buildroot}%{homepath}/init.d

View File

@ -247,7 +247,7 @@ typedef struct {
SMqCommitCbParamSet* params; SMqCommitCbParamSet* params;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
int32_t vgId; int32_t vgId;
tmq_t* pTmq; int64_t consumerId;
} SMqCommitCbParam; } SMqCommitCbParam;
typedef struct SSyncCommitInfo { typedef struct SSyncCommitInfo {
@ -485,7 +485,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet); taosMemoryFree(pBuf->pEpSet);
return commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
} }
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
@ -529,7 +529,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pParam->params = pParamSet; pParam->params = pParamSet;
pParam->vgId = vgId; pParam->vgId = vgId;
pParam->pTmq = tmq; pParam->consumerId = tmq->consumerId;
tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName)); tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
@ -1505,22 +1505,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = NULL; tmq_t* tmq = NULL;
SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqPollCbParam* pParam = (SMqPollCbParam*)param;
if (pParam == NULL || pMsg == NULL) { if (pParam == NULL || pMsg == NULL) {
goto FAIL2; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
int64_t refId = pParam->refId; int64_t refId = pParam->refId;
int32_t vgId = pParam->vgId; int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId; uint64_t requestId = pParam->requestId;
tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) { if (tmq == NULL) {
code = TSDB_CODE_TMQ_CONSUMER_CLOSED; return TSDB_CODE_TMQ_CONSUMER_CLOSED;
goto FAIL2;
} }
SMqPollRspWrapper* pRspWrapper = NULL; SMqPollRspWrapper* pRspWrapper = NULL;
code = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
if (code) { if (ret) {
code = ret;
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
goto FAIL1; goto END;
} }
if (code != 0) { if (code != 0) {
@ -1603,25 +1603,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
END: END:
pRspWrapper->code = code; if (pRspWrapper){
pRspWrapper->vgId = vgId; pRspWrapper->code = code;
(void)strcpy(pRspWrapper->topicName, pParam->topicName); pRspWrapper->vgId = vgId;
code = taosWriteQitem(tmq->mqueue, pRspWrapper); (void)strcpy(pRspWrapper->topicName, pParam->topicName);
if(code != 0){ code = taosWriteQitem(tmq->mqueue, pRspWrapper);
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); if(code != 0){
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
}
} }
int32_t total = taosQueueItemSize(tmq->mqueue); int32_t total = taosQueueItemSize(tmq->mqueue);
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
tmq->consumerId, rspType, vgId, total, requestId); tmq->consumerId, rspType, vgId, total, requestId);
FAIL1:
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
FAIL2:
if (tmq) (void)tsem2_post(&tmq->rspSem); if (tmq) (void)tsem2_post(&tmq->rspSem);
if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
return code; return code;
} }

View File

@ -1178,6 +1178,13 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
&getObjectDataCallback}; &getObjectDataCallback};
TS3SizeCBD cbd = {0}; TS3SizeCBD cbd = {0};
int retryCount = 0;
static int maxRetryCount = 5;
static int minRetryInterval = 1000; // ms
static int maxRetryInterval = 3000; // ms
_retry:
(void)memset(&cbd, 0, sizeof(cbd));
cbd.content_length = size; cbd.content_length = size;
cbd.buf_pos = 0; cbd.buf_pos = 0;
do { do {
@ -1185,6 +1192,11 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
} while (S3_status_is_retryable(cbd.status) && should_retry()); } while (S3_status_is_retryable(cbd.status) && should_retry());
if (cbd.status != S3StatusOK) { if (cbd.status != S3StatusOK) {
if (S3StatusErrorSlowDown == cbd.status && retryCount++ < maxRetryCount) {
taosMsleep(taosRand() % (maxRetryInterval - minRetryInterval + 1) + minRetryInterval);
uInfo("%s: %d/%s(%s) retry get object", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
goto _retry;
}
uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg); uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO)); TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));

View File

@ -290,7 +290,7 @@ static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrap
pWrapper->nCols = taosArrayGetSize(pFields); pWrapper->nCols = taosArrayGetSize(pFields);
pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema)); pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
if (NULL == pWrapper->pSchema) { if (NULL == pWrapper->pSchema) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
SNode *pNode; SNode *pNode;
@ -328,15 +328,18 @@ static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL; SNode *pAst = NULL;
SQueryPlan *pPlan = NULL; SQueryPlan *pPlan = NULL;
int32_t code = 0;
mInfo("stream:%s to create", pCreate->name); mInfo("stream:%s to create", pCreate->name);
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN); memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
pObj->createTime = taosGetTimestampMs(); pObj->createTime = taosGetTimestampMs();
pObj->updateTime = pObj->createTime; pObj->updateTime = pObj->createTime;
pObj->version = 1; pObj->version = 1;
if (pCreate->smaId > 0) { if (pCreate->smaId > 0) {
pObj->subTableWithoutMd5 = 1; pObj->subTableWithoutMd5 = 1;
} }
pObj->smaId = pCreate->smaId; pObj->smaId = pCreate->smaId;
pObj->indexForMultiAggBalance = -1; pObj->indexForMultiAggBalance = -1;
@ -360,8 +363,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
if (pSourceDb == NULL) { if (pSourceDb == NULL) {
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr()); mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr());
return terrno; code = terrno;
goto FAIL;
} }
pObj->sourceDbUid = pSourceDb->uid; pObj->sourceDbUid = pSourceDb->uid;
mndReleaseDb(pMnode, pSourceDb); mndReleaseDb(pMnode, pSourceDb);
@ -369,9 +374,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
if (pTargetDb == NULL) { if (pTargetDb == NULL) {
mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr());
return terrno; code = terrno;
goto FAIL;
} }
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN); tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) { if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
@ -389,12 +396,12 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pCreate->ast = NULL; pCreate->ast = NULL;
// deserialize ast // deserialize ast
if (nodesStringToNode(pObj->ast, &pAst) < 0) { if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
goto FAIL; goto FAIL;
} }
// create output schema // create output schema
if (createSchemaByFields(pCreate->pCols, &pObj->outputSchema) != TSDB_CODE_SUCCESS) { if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
goto FAIL; goto FAIL;
} }
@ -403,6 +410,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->outputSchema.nCols += numOfNULL; pObj->outputSchema.nCols += numOfNULL;
SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema)); SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
if (!pFullSchema) { if (!pFullSchema) {
code = terrno;
goto FAIL; goto FAIL;
} }
@ -410,6 +418,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
int32_t dataIndex = 0; int32_t dataIndex = 0;
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
if (pos == NULL) {
continue;
}
if (nullIndex >= numOfNULL || i < pos->slotId) { if (nullIndex >= numOfNULL || i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
@ -444,22 +456,31 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
}; };
// using ast and param to build physical plan // using ast and param to build physical plan
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
goto FAIL; goto FAIL;
} }
// save physcial plan // save physcial plan
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) { if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
goto FAIL; goto FAIL;
} }
pObj->tagSchema.nCols = pCreate->numOfTags; pObj->tagSchema.nCols = pCreate->numOfTags;
if (pCreate->numOfTags) { if (pCreate->numOfTags) {
pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema)); pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
if (pObj->tagSchema.pSchema == NULL) {
code = terrno;
goto FAIL;
}
} }
/*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/ /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
for (int32_t i = 0; i < pCreate->numOfTags; i++) { for (int32_t i = 0; i < pCreate->numOfTags; i++) {
SField *pField = taosArrayGet(pCreate->pTags, i); SField *pField = taosArrayGet(pCreate->pTags, i);
if (pField == NULL) {
continue;
}
pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1; pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
pObj->tagSchema.pSchema[i].bytes = pField->bytes; pObj->tagSchema.pSchema[i].bytes = pField->bytes;
pObj->tagSchema.pSchema[i].flags = pField->flags; pObj->tagSchema.pSchema[i].flags = pField->flags;
@ -470,7 +491,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
FAIL: FAIL:
if (pAst != NULL) nodesDestroyNode(pAst); if (pAst != NULL) nodesDestroyNode(pAst);
if (pPlan != NULL) qDestroyQueryPlan(pPlan); if (pPlan != NULL) qDestroyQueryPlan(pPlan);
return 0; return code;
} }
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
@ -575,12 +596,15 @@ int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
int32_t code = 0;
int32_t lino = 0;
SMCreateStbReq createReq = {0}; SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
createReq.numOfColumns = pStream->outputSchema.nCols; createReq.numOfColumns = pStream->outputSchema.nCols;
createReq.numOfTags = 1; // group id createReq.numOfTags = 1; // group id
createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns); createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
// build fields // build fields
for (int32_t i = 0; i < createReq.numOfColumns; i++) { for (int32_t i = 0; i < createReq.numOfColumns; i++) {
@ -595,6 +619,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
if (pStream->tagSchema.nCols == 0) { if (pStream->tagSchema.nCols == 0) {
createReq.numOfTags = 1; createReq.numOfTags = 1;
createReq.pTags = taosArrayInit_s(sizeof(SField), 1); createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
// build tags // build tags
SField *pField = taosArrayGet(createReq.pTags, 0); SField *pField = taosArrayGet(createReq.pTags, 0);
strcpy(pField->name, "group_id"); strcpy(pField->name, "group_id");
@ -604,6 +630,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
} else { } else {
createReq.numOfTags = pStream->tagSchema.nCols; createReq.numOfTags = pStream->tagSchema.nCols;
createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags); createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
for (int32_t i = 0; i < createReq.numOfTags; i++) { for (int32_t i = 0; i < createReq.numOfTags; i++) {
SField *pField = taosArrayGet(createReq.pTags, i); SField *pField = taosArrayGet(createReq.pTags, i);
pField->bytes = pStream->tagSchema.pSchema[i].bytes; pField->bytes = pStream->tagSchema.pSchema[i].bytes;
@ -657,7 +685,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols); mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
return 0; return code;
_OVER: _OVER:
tFreeSMCreateStbReq(&createReq); tFreeSMCreateStbReq(&createReq);
@ -665,7 +693,7 @@ _OVER:
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno)); mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno));
return -1; return code;
} }
// 1. stream number check // 1. stream number check
@ -709,9 +737,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
char *sql = NULL; char *sql = NULL;
int32_t sqlLen = 0; int32_t sqlLen = 0;
const char *pMsg = "create stream tasks on dnodes"; const char *pMsg = "create stream tasks on dnodes";
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
terrno = TSDB_CODE_SUCCESS;
terrno = TSDB_CODE_SUCCESS;
SCMCreateStreamReq createReq = {0}; SCMCreateStreamReq createReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
@ -749,6 +777,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (createReq.sql != NULL) { if (createReq.sql != NULL) {
sqlLen = strlen(createReq.sql); sqlLen = strlen(createReq.sql);
sql = taosMemoryMalloc(sqlLen + 1); sql = taosMemoryMalloc(sqlLen + 1);
if (sql == NULL) {
code = terrno;
goto _OVER;
}
memset(sql, 0, sqlLen + 1); memset(sql, 0, sqlLen + 1);
memcpy(sql, createReq.sql, sqlLen); memcpy(sql, createReq.sql, sqlLen);
} }
@ -942,8 +975,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
void *buf = taosMemoryMalloc(tlen); void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno;
return -1;
} }
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
@ -1150,7 +1182,11 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId)); SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i); STaskId *p = taosArrayGet(execInfo.pTaskList, i);
if (p == NULL) {
continue;
}
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) { if (pEntry == NULL) {
continue; continue;
@ -1159,8 +1195,12 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
if (pEntry->status == TASK_STATUS__STOP) { if (pEntry->status == TASK_STATUS__STOP) {
for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
STaskId *pId = taosArrayGet(pInvalidList, j); STaskId *pId = taosArrayGet(pInvalidList, j);
if (pId == NULL) {
continue;
}
if (pEntry->id.streamId == pId->streamId) { if (pEntry->id.streamId == pId->streamId) {
void* px = taosArrayPush(pInvalidList, &pEntry->id); void *px = taosArrayPush(pInvalidList, &pEntry->id);
if (px == NULL) { if (px == NULL) {
mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
} }
@ -1243,6 +1283,10 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
} }
SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval)); SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
if (pList == NULL) {
return -1;
}
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
@ -2472,14 +2516,15 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) { if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo));
doAddReportStreamTask(pList, &req); if (pList != NULL) {
doAddReportStreamTask(pList, &req);
code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES);
if (code) {
mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
}
code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
if (code) {
mError("stream:0x%"PRIx64 " failed to put into checkpoint stream", req.streamId);
} }
pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
} else { } else {
doAddReportStreamTask(*pReqTaskList, &req); doAddReportStreamTask(*pReqTaskList, &req);
} }
@ -2545,6 +2590,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
SArray *pStreamList = taosArrayInit(4, sizeof(int64_t)); SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
if (pStreamList == NULL) {
return terrno;
}
mDebug("start to process consensus-checkpointId in tmr"); mDebug("start to process consensus-checkpointId in tmr");
@ -2572,6 +2620,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
int64_t streamId = -1; int64_t streamId = -1;
int32_t num = taosArrayGetSize(pInfo->pTaskList); int32_t num = taosArrayGetSize(pInfo->pTaskList);
SArray *pList = taosArrayInit(4, sizeof(int32_t)); SArray *pList = taosArrayInit(4, sizeof(int32_t));
if (pList == NULL) {
continue;
}
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);

View File

@ -957,6 +957,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK); metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid); code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
metaReaderClear(&mer1);
return;
}
pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag); pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
metaReaderClear(&mer1); metaReaderClear(&mer1);

View File

@ -1777,6 +1777,19 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExp->base.resSchema = pExp->base.resSchema =
createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName); createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pNode; pExp->pExpr->_optrRoot.pRootNode = pNode;
} else if (type == QUERY_NODE_LOGIC_CONDITION) {
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
if (!pExp->base.pParam) {
code = terrno;
}
if (TSDB_CODE_SUCCESS == code) {
pExp->base.numOfParams = 1;
SDataType* pType = &pCond->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pNode;
}
} else { } else {
ASSERT(0); ASSERT(0);
} }

View File

@ -365,6 +365,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
ps->onlyRef = true; ps->onlyRef = true;
code = tsortAddSource(pInfo->pSortHandle, ps); code = tsortAddSource(pInfo->pSortHandle, ps);
if (code) { if (code) {
taosMemoryFree(ps);
return code; return code;
} }

View File

@ -229,6 +229,7 @@ static void udfWatchUdfd(void *args) {
if(uv_loop_close(&pData->loop) != 0) { if(uv_loop_close(&pData->loop) != 0) {
fnError("udfd loop close failed, lino:%d", __LINE__); fnError("udfd loop close failed, lino:%d", __LINE__);
} }
return;
_exit: _exit:
if (terrno != 0) { if (terrno != 0) {

View File

@ -2608,7 +2608,8 @@ static int32_t calcSelectFuncNum(SFunctionNode* pFunc, int32_t currSelectFuncNum
: 1); : 1);
} }
static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc) {
SNode* pCurrStmt = pCxt->pCurrStmt;
if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) { if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) {
SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt; SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt;
pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId); pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId);
@ -2641,7 +2642,9 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
pSelect->hasLastFunc = pSelect->hasLastFunc ? true : (FUNCTION_TYPE_LAST == pFunc->funcType); pSelect->hasLastFunc = pSelect->hasLastFunc ? true : (FUNCTION_TYPE_LAST == pFunc->funcType);
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId); pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId); pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId);
pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false; if (SQL_CLAUSE_SELECT == pCxt->currClause) {
pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false;
}
} }
} }
@ -2903,7 +2906,7 @@ static int32_t translateNormalFunction(STranslateContext* pCxt, SNode** ppNode)
code = translateBlockDistFunc(pCxt, pFunc); code = translateBlockDistFunc(pCxt, pFunc);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
setFuncClassification(pCxt->pCurrStmt, pFunc); setFuncClassification(pCxt, pFunc);
} }
return code; return code;
} }

View File

@ -262,7 +262,7 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
colDataSetNULL(pOutputData, i); colDataSetNULL(pOutputData, i);
continue; continue;
} }
out[i] = f1(in[i]); out[i] = f1(in[i]) + 0;
} }
break; break;
} }
@ -276,7 +276,7 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
colDataSetNULL(pOutputData, i); colDataSetNULL(pOutputData, i);
continue; continue;
} }
out[i] = d1(in[i]); out[i] = d1(in[i]) + 0;
} }
break; break;
} }

View File

@ -57,7 +57,7 @@ class TDTestCase:
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, -0.444, 44.44, 1, "binary4", "nchar4", now()+4a )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
@ -223,6 +223,9 @@ class TDTestCase:
tdSql.checkData(3, 4, 33) tdSql.checkData(3, 4, 33)
tdSql.checkData(5, 5, None) tdSql.checkData(5, 5, None)
tdSql.query(f"select ceil(c5) from {dbname}.t1")
tdSql.checkData(4 , 0, 0)
self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), ceil(c2) ,ceil(c3), ceil(c4), ceil(c5) from {dbname}.t1") self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), ceil(c2) ,ceil(c3), ceil(c4), ceil(c5) from {dbname}.t1")
# used for sub table # used for sub table

View File

@ -57,6 +57,8 @@ class TDTestCase:
tdSql.query("select * from (select ts, col1 from sta partition by tbname) limit 2"); tdSql.query("select * from (select ts, col1 from sta partition by tbname) limit 2");
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.query('select col1 > 0 and col2 > 0 from stb')
tdSql.checkRows(12)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)

View File

@ -53,7 +53,7 @@ class TDTestCase:
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, -0.444, 44.44, 1, "binary4", "nchar4", now()+4a )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
@ -232,6 +232,9 @@ class TDTestCase:
tdSql.checkData(3, 4, 33) tdSql.checkData(3, 4, 33)
tdSql.checkData(5, 5, None) tdSql.checkData(5, 5, None)
tdSql.query(f"select round(c5) from {dbname}.t1")
tdSql.checkData(4 , 0, 0)
self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), round(c2) ,round(c3), round(c4), round(c5) from {dbname}.t1") self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), round(c2) ,round(c3), round(c4), round(c5) from {dbname}.t1")
# used for sub table # used for sub table

View File

@ -41,8 +41,9 @@ class TDTestCase:
time.sleep(10) time.sleep(10)
tdSql.execute("use test", queryTimes=100) tdSql.execute("use test", queryTimes=100)
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
tdLog.debug("========create stream and insert data ok========") time.sleep(5)
tdLog.debug("========create stream and insert data ok========")
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
rowCnt = tdSql.getRows() rowCnt = tdSql.getRows()
results_meters = tdSql.queryResult results_meters = tdSql.queryResult