diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index dcc3c86171..103e4b254e 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -67,9 +67,10 @@ typedef enum {
* Create the exec task for stream mode
* @param pMsg
* @param SReadHandle
+ * @param vgId
* @return
*/
-qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
+qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId);
/**
* Create the exec task for queue mode
@@ -77,7 +78,15 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @param SReadHandle
* @return
*/
-qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema);
+qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema);
+
+/**
+ * set the task Id, usually used by message queue process
+ * @param tinfo
+ * @param taskId
+ * @param queryId
+ */
+void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
/**
diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index 1b4292b943..9e5d9080b4 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -192,7 +192,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql;
- SSyncQueryParam* newpParam;
+ SSyncQueryParam* newpParam = NULL;
if (param == NULL) {
newpParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (newpParam == NULL) {
diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c
index 416d3830b5..4fdc6081e5 100644
--- a/source/client/src/clientMain.c
+++ b/source/client/src/clientMain.c
@@ -271,8 +271,6 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SReqResultInfo *pResultInfo;
if (msg->resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true);
- tscDebug("consumer:0x%" PRIx64 ", vgId:%d, numOfRows:%" PRId64 ", total rows:%" PRId64, msg->rsp.head.consumerId,
- msg->vgId, pResultInfo->numOfRows, pResultInfo->totalRows);
} else {
pResultInfo = tmqGetCurResInfo(res);
}
@@ -287,9 +285,6 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL;
}
- tscDebug("consumer:0x%" PRIx64 " vgId:%d, numOfRows:%" PRId64 ", total rows:%" PRId64, msg->rsp.head.consumerId,
- msg->vgId, pResultInfo->numOfRows, pResultInfo->totalRows);
-
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
return pResultInfo->row;
diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c
index be1b6b07a3..21590022b1 100644
--- a/source/client/src/clientMsgHandler.c
+++ b/source/client/src/clientMsgHandler.c
@@ -506,6 +506,9 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
}
+ if(code != 0){
+ taosMemoryFree(pRes);
+ }
tFreeSShowVariablesRsp(&rsp);
}
diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c
index 0de4a98141..7f58b579fa 100644
--- a/source/client/src/clientSml.c
+++ b/source/client/src/clientSml.c
@@ -117,7 +117,7 @@ int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, u
if (unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)) {
int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
- if (unit > INT64_MAX / tsInt64) {
+ if (tsInt64 != 0 && unit > INT64_MAX / tsInt64) {
return -1;
}
tsInt64 *= unit;
@@ -637,7 +637,10 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
ESchemaAction action = SCHEMA_ACTION_NULL;
- smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
+ int code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
+ if(code != 0){
+ return code;
+ }
if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
SField field = {0};
field.type = kv->type;
@@ -646,6 +649,10 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
taosArrayPush(results, &field);
} else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
+ if(index == NULL){
+ uError("smlBuildFieldsList get error, key:%s", kv->key);
+ return TSDB_CODE_SML_INVALID_DATA;
+ }
uint16_t newIndex = *index;
if (isTag) newIndex -= numOfCols;
SField *field = (SField *)taosArrayGet(results, newIndex);
@@ -774,9 +781,16 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
- smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
- smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
-
+ code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
+ if (code != TSDB_CODE_SUCCESS) {
+ uError("SML:0x%" PRIx64 " smlBuildFieldsList tag1 failed. %s", info->id, pName.tname);
+ goto end;
+ }
+ code = smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
+ if (code != TSDB_CODE_SUCCESS) {
+ uError("SML:0x%" PRIx64 " smlBuildFieldsList col1 failed. %s", info->id, pName.tname);
+ goto end;
+ }
code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
@@ -820,8 +834,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
taosArrayPush(pTags, &field);
}
}
- smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
+ code = smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
pTableMeta->tableInfo.numOfColumns, true);
+ if (code != TSDB_CODE_SUCCESS) {
+ uError("SML:0x%" PRIx64 " smlBuildFieldsList tag2 failed. %s", info->id, pName.tname);
+ goto end;
+ }
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
if (code != TSDB_CODE_SUCCESS) {
@@ -868,8 +886,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
}
}
- smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
+ code = smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
pTableMeta->tableInfo.numOfColumns, false);
+ if (code != TSDB_CODE_SUCCESS) {
+ uError("SML:0x%" PRIx64 " smlBuildFieldsList col2 failed. %s", info->id, pName.tname);
+ goto end;
+ }
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
if (code != TSDB_CODE_SUCCESS) {
@@ -1097,6 +1119,9 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
}
if (taos != NULL) {
info->taos = acquireTscObj(*(int64_t *)taos);
+ if(info->taos == NULL){
+ goto cleanup;
+ }
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
@@ -1151,13 +1176,16 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlLineInfo *elements = info->lines + i;
SSmlTableInfo *tinfo = NULL;
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
- tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
+ SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
+ if(tmp) tinfo = *tmp;
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
- tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
- elements->measureLen + elements->tagsLen);
+ SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
+ elements->measureLen + elements->tagsLen);
+ if(tmp) tinfo = *tmp;
} else {
- tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
- elements->measureLen + elements->tagsLen);
+ SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
+ elements->measureLen + elements->tagsLen);
+ if(tmp) tinfo = *tmp;
}
if (tinfo == NULL) {
diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c
index da82d43950..9fd98e33b7 100644
--- a/source/client/src/clientSmlJson.c
+++ b/source/client/src/clientSmlJson.c
@@ -1237,10 +1237,12 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
if (cnt >= payloadNum) {
payloadNum = payloadNum << 1;
void *tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
- if (tmp != NULL) {
- info->lines = (SSmlLineInfo *)tmp;
- memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
+ if (tmp == NULL) {
+ ret = TSDB_CODE_OUT_OF_MEMORY;
+ return ret;
}
+ info->lines = (SSmlLineInfo *)tmp;
+ memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
}
ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt);
if ((info->lines + cnt)->measure == NULL) break;
diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c
index f5ae077b5d..b1aea1bfaa 100644
--- a/source/client/src/clientSmlLine.c
+++ b/source/client/src/clientSmlLine.c
@@ -292,6 +292,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
+ smlDestroyTableInfo(info, tinfo);
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
diff --git a/source/client/src/clientSmlTelnet.c b/source/client/src/clientSmlTelnet.c
index ccf79cfc64..036442573d 100644
--- a/source/client/src/clientSmlTelnet.c
+++ b/source/client/src/clientSmlTelnet.c
@@ -292,7 +292,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
return TSDB_CODE_SUCCESS;
}
- if (info->dataFormat) {
+ if (info->dataFormat && info->currSTableMeta != NULL) {
if (needConverTime) {
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
}
diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c
index fc9f7540f7..79139a5fe5 100644
--- a/source/client/src/clientTmq.c
+++ b/source/client/src/clientTmq.c
@@ -580,7 +580,10 @@ static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, t
int32_t code = -1;
taosThreadMutexLock(&tmq->lock);
- for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
+ int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
+
+ tscDebug("consumer:0x%" PRIx64 " user invoked commit offset for %d", tmq->consumerId, numOfTopics);
+ for (int32_t i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(pTopic->topicName, topic) != 0) {
continue;
@@ -874,7 +877,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
return 0;
}
-static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
+static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
// do nothing
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
@@ -905,6 +908,8 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
}
+
+ return NULL;
}
void tmqClearUnhandleMsg(tmq_t* tmq) {
@@ -1333,10 +1338,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
- tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%" PRId64
- " type %d, reqId:0x%" PRIx64,
- tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, rspType, requestId);
-
+ char buf[80];
+ tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset);
+ tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
+ tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
@@ -1375,6 +1380,11 @@ CREATE_MSG_FAIL:
return -1;
}
+typedef struct SVgroupSaveInfo {
+ STqOffsetVal offset;
+ int64_t numOfRows;
+} SVgroupSaveInfo;
+
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
tmq_t* tmq) {
pTopic->schema = pTopicEp->schema;
@@ -1394,11 +1404,13 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
- STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
+ SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
+ int64_t numOfRows = 0;
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
- if (pOffset != NULL) {
- offsetNew = *pOffset;
+ if (pInfo != NULL) {
+ offsetNew = pInfo->offset;
+ numOfRows = pInfo->numOfRows;
}
SMqClientVg clientVg = {
@@ -1409,7 +1421,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.vgStatus = TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0,
.emptyBlockReceiveTs = 0,
- .numOfRows = 0,
+ .numOfRows = numOfRows,
};
taosArrayPush(pTopic->vgs, &clientVg);
@@ -1461,7 +1473,9 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
pVgCur->vgId, vgKey, buf);
- taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
+
+ SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows};
+ taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
}
}
}
@@ -1602,10 +1616,11 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj;
}
-SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg) {
+SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
+ (*numOfRows) = 0;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
@@ -1624,8 +1639,8 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg)
for(int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
int64_t rows = htobe64(pRetrieve->numOfRows);
- pRspObj->resInfo.totalRows += rows;
pVg->numOfRows += rows;
+ (*numOfRows) += rows;
}
return pRspObj;
@@ -1788,29 +1803,28 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
while (1) {
- SMqRspWrapper* rspWrapper = NULL;
- taosGetQitem(tmq->qall, (void**)&rspWrapper);
+ SMqRspWrapper* pRspWrapper = NULL;
+ taosGetQitem(tmq->qall, (void**)&pRspWrapper);
- if (rspWrapper == NULL) {
+ if (pRspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
- taosGetQitem(tmq->qall, (void**)&rspWrapper);
+ taosGetQitem(tmq->qall, (void**)&pRspWrapper);
- if (rspWrapper == NULL) {
+ if (pRspWrapper == NULL) {
return NULL;
}
}
- tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, rspWrapper->tmqRspType);
+ tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
- if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
- taosFreeQitem(rspWrapper);
+ if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
+ taosFreeQitem(pRspWrapper);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
return NULL;
- } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
- SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
+ } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
+ SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
- /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
@@ -1833,28 +1847,31 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
char buf[80];
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
if (pDataRsp->blockNum == 0) {
- tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, reqId:0x%" PRIx64, tmq->consumerId,
- pVg->vgId, buf, pollRspWrapper->reqId);
+ tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%"PRId64" total:%"PRId64" reqId:0x%" PRIx64, tmq->consumerId,
+ pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
+ pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
- rspWrapper = NULL;
- continue;
} else { // build rsp
- SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg);
- tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"PRId64" reqId:0x%" PRIx64,
- tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pRsp->resInfo.totalRows, pollRspWrapper->reqId);
+ int64_t numOfRows = 0;
+ SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
+ tmq->totalRows += numOfRows;
- tmq->totalRows += pRsp->resInfo.totalRows;
+ tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
+ " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
+ tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
+ pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper);
return pRsp;
}
} else {
- tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
- tmq->consumerId, pDataRsp->head.epoch, consumerEpoch);
- tmqFreeRspWrapper(rspWrapper);
+ SMqClientVg* pVg = pollRspWrapper->vgHandle;
+ tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
+ tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch);
+ pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
- } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
- SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
+ } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
+ SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
@@ -1868,13 +1885,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
- tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
- tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
- tmqFreeRspWrapper(rspWrapper);
+ tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
+ tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
+ pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
- } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
- SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
+ } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
+ SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
@@ -1883,10 +1900,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->taosxRsp.blockNum == 0) {
- rspWrapper = NULL;
- tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
- pollRspWrapper->reqId);
+ tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 " reqId:0x%" PRIx64,
+ tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
+ pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
continue;
} else {
@@ -1895,32 +1912,37 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// build rsp
void* pRsp = NULL;
+ int64_t numOfRows = 0;
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
- pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg);
+ pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
} else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
}
+ tmq->totalRows += numOfRows;
+
char buf[80];
tFormatOffset(buf, 80, &pVg->currentOffset);
- tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId, pVg->vgId,
- buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId);
+ tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
+ ", vg total:%" PRId64 " total:%"PRId64" reqId:0x%" PRIx64,
+ tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
+ tmq->totalRows, pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
- tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
- tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
- tmqFreeRspWrapper(rspWrapper);
+ tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
+ tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
+ pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
} else {
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
bool reset = false;
- tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
- taosFreeQitem(rspWrapper);
+ tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
+ taosFreeQitem(pRspWrapper);
if (pollIfReset && reset) {
tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, timeout);
@@ -1968,7 +1990,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if (tmqPollImpl(tmq, timeout) < 0) {
tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
- /*return NULL;*/
}
rspObj = tmqHandleAllRsp(tmq, timeout, false);
@@ -1997,6 +2018,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
}
int32_t tmq_consumer_close(tmq_t* tmq) {
+ tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
+
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp
index 0c7e95f8eb..5523e1f777 100644
--- a/source/client/test/clientTests.cpp
+++ b/source/client/test/clientTests.cpp
@@ -112,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
}
taos_free_result(pRes);
- for(int32_t i = 0; i < 2000; i += 20) {
+ for(int32_t i = 0; i < 100; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
@@ -167,6 +167,80 @@ void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) {
printf("success, code:%d\n", code);
}
+void* doConsumeData(void* param) {
+ TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
+
+ tmq_conf_t* conf = tmq_conf_new();
+ tmq_conf_set(conf, "enable.auto.commit", "true");
+ tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
+ tmq_conf_set(conf, "group.id", "cgrpName12");
+ tmq_conf_set(conf, "td.connect.user", "root");
+ tmq_conf_set(conf, "td.connect.pass", "taosdata");
+ tmq_conf_set(conf, "auto.offset.reset", "earliest");
+ tmq_conf_set(conf, "experimental.snapshot.enable", "true");
+ tmq_conf_set(conf, "msg.with.table.name", "true");
+ tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
+
+ tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
+ tmq_conf_destroy(conf);
+
+ // 创建订阅 topics 列表
+ tmq_list_t* topicList = tmq_list_new();
+ tmq_list_append(topicList, "topic_t2");
+
+ // 启动订阅
+ tmq_subscribe(tmq, topicList);
+
+ tmq_list_destroy(topicList);
+
+ TAOS_FIELD* fields = NULL;
+ int32_t numOfFields = 0;
+ int32_t precision = 0;
+ int32_t totalRows = 0;
+ int32_t msgCnt = 0;
+ int32_t timeout = 25000;
+
+ int32_t count = 0;
+
+ while (1) {
+ TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
+ if (pRes) {
+ char buf[1024];
+
+ const char* topicName = tmq_get_topic_name(pRes);
+ const char* dbName = tmq_get_db_name(pRes);
+ int32_t vgroupId = tmq_get_vgroup_id(pRes);
+
+ printf("topic: %s\n", topicName);
+ printf("db: %s\n", dbName);
+ printf("vgroup id: %d\n", vgroupId);
+
+ while (1) {
+ TAOS_ROW row = taos_fetch_row(pRes);
+ if (row == NULL) {
+ break;
+ }
+
+ fields = taos_fetch_fields(pRes);
+ numOfFields = taos_field_count(pRes);
+ precision = taos_result_precision(pRes);
+ taos_print_row(buf, row, fields, numOfFields);
+ totalRows += 1;
+// printf("precision: %d, row content: %s\n", precision, buf);
+ }
+
+ taos_free_result(pRes);
+ } else {
+ break;
+ }
+ }
+
+ tmq_consumer_close(tmq);
+ taos_close(pConn);
+ fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
+ return NULL;
+}
+
} // namespace
int main(int argc, char** argv) {
@@ -188,7 +262,6 @@ TEST(clientCase, driverInit_Test) {
TEST(clientCase, connect_Test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
-
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
@@ -708,7 +781,7 @@ TEST(clientCase, projection_query_tables) {
// }
// taos_free_result(pRes);
- TAOS_RES* pRes = taos_query(pConn, "use abc2");
+ TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
@@ -730,7 +803,7 @@ TEST(clientCase, projection_query_tables) {
}
taos_free_result(pRes);
- for (int32_t i = 0; i < 2; ++i) {
+ for (int32_t i = 0; i < 10000; ++i) {
printf("create table :%d\n", i);
createNewTable(pConn, i);
}
@@ -970,28 +1043,23 @@ TEST(clientCase, sub_db_test) {
taos_print_row(buf, row, fields, numOfFields);
printf("precision: %d, row content: %s\n", precision, buf);
}
+ taos_free_result(pRes);
}
-// return rows;
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
TEST(clientCase, sub_tb_test) {
+ taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
+
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
- // TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1");
- // if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
- // printf("failed to create topic, code:%s", taos_errstr(pRes));
- // taos_free_result(pRes);
- // return;
- // }
-
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
- tmq_conf_set(conf, "group.id", "cgrpName");
+ tmq_conf_set(conf, "group.id", "cgrpName27");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
@@ -1004,10 +1072,11 @@ TEST(clientCase, sub_tb_test) {
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
- tmq_list_append(topicList, "topic_t1");
+ tmq_list_append(topicList, "topic_t2");
// 启动订阅
tmq_subscribe(tmq, topicList);
+
tmq_list_destroy(topicList);
TAOS_FIELD* fields = NULL;
@@ -1015,7 +1084,7 @@ TEST(clientCase, sub_tb_test) {
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
- int32_t timeout = 5000;
+ int32_t timeout = 25000;
int32_t count = 0;
@@ -1023,7 +1092,6 @@ TEST(clientCase, sub_tb_test) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[1024];
- int32_t rows = 0;
const char* topicName = tmq_get_topic_name(pRes);
const char* dbName = tmq_get_db_name(pRes);
@@ -1033,27 +1101,45 @@ TEST(clientCase, sub_tb_test) {
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
- if (count ++ > 200) {
- tmq_unsubscribe(tmq);
- break;
- }
-
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
- if (row == NULL) break;
+ if (row == NULL) {
+ break;
+ }
fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes);
precision = taos_result_precision(pRes);
- rows++;
taos_print_row(buf, row, fields, numOfFields);
+ totalRows += 1;
printf("precision: %d, row content: %s\n", precision, buf);
}
+
+ taos_free_result(pRes);
+// if ((++count) > 1) {
+// break;
+// }
+ } else {
+ break;
}
-// return rows;
}
+ tmq_consumer_close(tmq);
+ taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
+TEST(clientCase, sub_tb_mt_test) {
+ taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
+ TdThread qid[20] = {0};
+
+ for(int32_t i = 0; i < 1; ++i) {
+ taosThreadCreate(&qid[i], NULL, doConsumeData, NULL);
+ }
+
+ for(int32_t i = 0; i < 4; ++i) {
+ taosThreadJoin(qid[i], NULL);
+ }
+}
+
#pragma GCC diagnostic pop
diff --git a/source/common/src/systable.c b/source/common/src/systable.c
index 141504a7c4..919a09962b 100644
--- a/source/common/src/systable.c
+++ b/source/common/src/systable.c
@@ -233,7 +233,7 @@ static const SSysDbTableSchema vgroupsSchema[] = {
{.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
- {.name = "cacheTables", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
+ {.name = "cacheelements", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
// {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
};
diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c
index 39ee443e51..d1671aa12a 100644
--- a/source/dnode/mnode/impl/src/mndScheduler.c
+++ b/source/dnode/mnode/impl/src/mndScheduler.c
@@ -115,7 +115,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
if (pStream->fixedSinkVgId == 0) {
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
- if (pDb->cfg.numOfVgroups > 1) {
+ if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
isShuffle = true;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c
index 48d8e89bfe..7fe08514f6 100644
--- a/source/dnode/mnode/impl/src/mndShow.c
+++ b/source/dnode/mnode/impl/src/mndShow.c
@@ -134,7 +134,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
showObj.pMnode = pMnode;
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
- strncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
+ tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000;
SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index 9e61364801..2a369a863a 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -3114,7 +3114,6 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
while (numOfRows < rows) {
- void *prevIter = pShow->pIter;
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
@@ -3123,12 +3122,6 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
continue;
}
- if ((numOfRows + pStb->numOfColumns) > rows) {
- pShow->pIter = prevIter;
- sdbRelease(pSdb, pStb);
- break;
- }
-
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c
index d4ca81a6a9..3d1b356f8c 100644
--- a/source/dnode/snode/src/snode.c
+++ b/source/dnode/snode/src/snode.c
@@ -75,9 +75,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
-
pTask->pMsgCb = &pSnode->msgCb;
-
pTask->startVer = ver;
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
@@ -90,11 +88,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
.pStateBackend = pTask->pState,
};
- pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
+
+ pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0);
ASSERT(pTask->exec.executor);
streamSetupTrigger(pTask);
-
return 0;
}
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index 2a720eb589..ed4facb061 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -180,6 +180,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
+void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c
index 4b1163bb11..6ab322d26a 100644
--- a/source/dnode/vnode/src/meta/metaQuery.c
+++ b/source/dnode/vnode/src/meta/metaQuery.c
@@ -14,6 +14,8 @@
*/
#include "meta.h"
+#include "osMemory.h"
+#include "tencode.h"
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
memset(pReader, 0, sizeof(*pReader));
@@ -1235,9 +1237,14 @@ END:
return 0;
}
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
- int32_t ret = 0;
- char *buf = NULL;
+ SMetaEntry oStbEntry = {0};
+ int32_t ret = -1;
+ char *buf = NULL;
+ void *pData = NULL;
+ int nData = 0;
+ SDecoder dc = {0};
+ STbDbKey tbDbKey = {0};
STagIdxKey *pKey = NULL;
int32_t nKey = 0;
@@ -1249,8 +1256,34 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
pCursor->type = param->type;
metaRLock(pMeta);
+
+ if (tdbTbGet(pMeta->pUidIdx, ¶m->suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
+ goto END;
+ }
+ tbDbKey.uid = param->suid;
+ tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
+ tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
+
+ tDecoderInit(&dc, pData, nData);
+ ret = metaDecodeEntry(&dc, &oStbEntry);
+
+ if (oStbEntry.stbEntry.schemaTag.pSchema == NULL || oStbEntry.stbEntry.schemaTag.pSchema == NULL) {
+ ret = -1;
+ goto END;
+ }
+ ret = -1;
+ for (int i = 0; i < oStbEntry.stbEntry.schemaTag.nCols; i++) {
+ SSchema *schema = oStbEntry.stbEntry.schemaTag.pSchema + i;
+ if (schema->colId == param->cid && param->type == schema->type && (IS_IDX_ON(schema) || i == 0)) {
+ ret = 0;
+ }
+ }
+ if (ret != 0) {
+ goto END;
+ }
+
ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
- if (ret < 0) {
+ if (ret != 0) {
goto END;
}
@@ -1271,6 +1304,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
maxSize = 4 * nTagData + 1;
buf = taosMemoryCalloc(1, maxSize);
if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
+ ret = -1;
goto END;
}
@@ -1288,8 +1322,10 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
if (ret != 0) {
goto END;
}
+
int cmp = 0;
- if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
+ ret = tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp);
+ if (ret != 0) {
goto END;
}
@@ -1353,6 +1389,10 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
+ if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
+ tDecoderClear(&dc);
+ tdbFree(pData);
+
taosMemoryFree(buf);
taosMemoryFree(pKey);
diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c
index 99e171dde1..3ed1b083e4 100644
--- a/source/dnode/vnode/src/sma/smaRollup.c
+++ b/source/dnode/vnode/src/sma/smaRollup.c
@@ -282,7 +282,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
.initTqReader = 1,
.pStateBackend = pStreamState,
};
- pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
+ pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
return TSDB_CODE_FAILED;
@@ -864,7 +864,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
TSDB_CHECK_CODE(code, lino, _exit);
}
- dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
+ dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
if (!dstTaskInfo) {
code = TSDB_CODE_RSMA_QTASKINFO_CREATE;
TSDB_CHECK_CODE(code, lino, _exit);
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 653eb2b9c4..646abc5d6a 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -243,8 +243,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset);
- tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s",
- TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
+ tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"PRIx64,
+ TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
return 0;
}
@@ -256,6 +256,7 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqOffset offset = {0};
+ int32_t vgId = TD_VID(pTq->pVnode);
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
@@ -267,10 +268,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
- offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
+ offset.subKey, vgId, offset.val.uid, offset.val.ts);
} else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
- TD_VID(pTq->pVnode), offset.val.version);
+ vgId, offset.val.version);
if (offset.val.version + 1 == sversion) {
offset.val.version += 1;
}
@@ -362,6 +363,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
uint64_t consumerId = pRequest->consumerId;
STqOffsetVal reqOffset = pRequest->reqOffset;
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
+ int32_t vgId = TD_VID(pTq->pVnode);
+
*pBlockReturned = false;
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
@@ -371,12 +374,15 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
char formatBuf[80];
tFormatOffset(formatBuf, 80, pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
- consumerId, pHandle->subKey, TD_VID(pTq->pVnode), formatBuf);
+ consumerId, pHandle->subKey, vgId, formatBuf);
return 0;
} else {
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pRequest->useSnapshot) {
+ tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
+ consumerId, pHandle->subKey, vgId);
+
if (pHandle->fetchMeta) {
tqOffsetResetToMeta(pOffsetVal, 0);
} else {
@@ -397,8 +403,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
- tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
- pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
+ tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
+ pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
tDeleteSMqDataRsp(&dataRsp);
@@ -409,16 +415,14 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
tqInitTaosxRsp(&taosxRsp, pRequest);
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
-// int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
tDeleteSTaosxRsp(&taosxRsp);
*pBlockReturned = true;
return code;
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
- tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
- " in vg %d, subkey %s, reset none failed",
- pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pRequest->subKey);
+ tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
+ pHandle->subKey, consumerId, vgId, pRequest->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1;
}
@@ -435,11 +439,15 @@ static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
// lock
taosWLockLatch(&pTq->pushLock);
+ qSetTaskId(pHandle->execHandle.task, pRequest->consumerId, pRequest->reqId);
+
int code = tqScanData(pTq, pHandle, &dataRsp, offset);
if(code != 0) {
tDeleteSMqDataRsp(&dataRsp);
+ taosWUnLockLatch(&pTq->pushLock);
return TSDB_CODE_TMQ_CONSUMER_ERROR;
}
+
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
@@ -725,7 +733,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req);
- tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
+ SVnode* pVnode = pTq->pVnode;
+ int32_t vgId = TD_VID(pVnode);
+
+ tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
@@ -754,7 +765,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->fetchMeta = req.withMeta;
// TODO version should be assigned and refed during preprocess
- SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
+ SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
if (pRef == NULL) {
taosMemoryFree(req.qmsg);
return -1;
@@ -764,8 +775,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->pRef = pRef;
SReadHandle handle = {
- .meta = pTq->pVnode->pMeta,
- .vnode = pTq->pVnode,
+ .meta = pVnode->pMeta,
+ .vnode = pVnode,
.initTableReader = true,
.initTqReader = true,
.version = ver,
@@ -778,38 +789,38 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req.qmsg = NULL;
pHandle->execHandle.task =
- qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL);
+ qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
- pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
- pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
+ pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
+ pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
- pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
+ pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
- pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
+ pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
- vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
- tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
+ vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
+ tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
- tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
+ tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
}
- pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
+ pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList);
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
- pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
+ pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
}
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
@@ -862,6 +873,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
#endif
+ int32_t vgId = TD_VID(pTq->pVnode);
pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
@@ -874,9 +886,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
-
pTask->pMsgCb = &pTq->pVnode->msgCb;
-
pTask->startVer = ver;
// expand executor
@@ -896,7 +906,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.initTqReader = 1,
.pStateBackend = pTask->pState,
};
- pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
+
+ pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
if (pTask->exec.executor == NULL) {
return -1;
}
@@ -911,7 +922,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
.pStateBackend = pTask->pState,
};
- pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
+
+ pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
if (pTask->exec.executor == NULL) {
return -1;
}
@@ -941,9 +953,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
streamSetupTrigger(pTask);
-
- tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", TD_VID(pTq->pVnode), pTask->taskId,
- pTask->selfChildId, pTask->taskLevel);
+ tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel);
return 0;
}
diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c
index 19c8f8af3e..3063f0d372 100644
--- a/source/dnode/vnode/src/tq/tqExec.c
+++ b/source/dnode/vnode/src/tq/tqExec.c
@@ -82,7 +82,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
return -1;
}
- tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
+ tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock);
// current scan should be stopped asap, since the rebalance occurs.
if (pDataBlock == NULL) {
diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c
index ce0aa144f9..b6bca1e4ca 100644
--- a/source/dnode/vnode/src/tq/tqMeta.c
+++ b/source/dnode/vnode/src/tq/tqMeta.c
@@ -269,11 +269,13 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
}
int32_t tqMetaRestoreHandle(STQ* pTq) {
+ int code = 0;
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
return -1;
}
+ int32_t vgId = TD_VID(pTq->pVnode);
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
@@ -290,7 +292,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
- return -1;
+ code = -1;
+ goto end;
}
walRefVer(handle.pRef, handle.snapshotVer);
@@ -304,19 +307,24 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle.execHandle.task =
- qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL);
+ qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, NULL);
if (handle.execHandle.task == NULL) {
tqError("cannot create exec task for %s", handle.subKey);
- return -1;
+ code = -1;
+ goto end;
}
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.task, &scanner);
if (scanner == NULL) {
tqError("cannot extract stream scanner for %s", handle.subKey);
+ code = -1;
+ goto end;
}
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
if (handle.execHandle.pExecReader == NULL) {
tqError("cannot extract exec reader for %s", handle.subKey);
+ code = -1;
+ goto end;
}
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
@@ -324,7 +332,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
(SSnapContext**)(&reader.sContext));
- handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
+ handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
@@ -341,14 +349,15 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
- handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
+ handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
}
+end:
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
- return 0;
+ return code;
}
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index dafd4d7485..8a89cb6bd7 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -370,11 +370,6 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
}
return TSDB_CODE_SUCCESS;
-
-_error:
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- tqError("failed to encode submit req since %s", terrstr());
- return TSDB_CODE_OUT_OF_MEMORY;
}
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
@@ -441,9 +436,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq createTbReq = {0};
SVCreateTbReq* pCreateTbReq = &createTbReq;
- if (!pCreateTbReq) {
- goto _end;
- }
// set const
pCreateTbReq->flags = 0;
@@ -460,6 +452,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
if (size == 2) {
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
+ tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
STagVal tagVal = {
@@ -477,6 +470,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} else {
tagArray = taosArrayInit(size - 1, sizeof(STagVal));
if (!tagArray) {
+ tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
@@ -503,6 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
+ tdDestroySVCreateTbReq(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
@@ -556,6 +551,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SVCreateTbReq* pCreateTbReq = NULL;
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
+ taosMemoryFree(ctbName);
goto _end;
};
@@ -572,6 +568,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
// set tag content
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
+ taosMemoryFree(ctbName);
+ tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
STagVal tagVal = {
@@ -586,6 +584,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
+ taosMemoryFree(ctbName);
+ tdDestroySVCreateTbReq(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
@@ -630,6 +630,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
// rows
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(tbData.aRowP);
+ tdDestroySVCreateTbReq(tbData.pCreateTbReq);
goto _end;
}
@@ -680,6 +681,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SSubmitReq2 submitReq = {0};
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
+ tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
@@ -693,6 +695,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
+ tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE);
goto _end;
}
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
@@ -704,6 +707,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqError("failed to encode submit req since %s", terrstr());
tEncoderClear(&encoder);
rpcFreeCont(pBuf);
+ tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE);
continue;
}
tEncoderClear(&encoder);
diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c
index e4c059b235..3d01184e78 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCache.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCache.c
@@ -658,6 +658,9 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
bool hasVal = false;
state->row = tMergeTreeGetRow(&state->mergeTree);
*ppRow = &state->row;
+ if (nCols != state->pLoadInfo->numOfCols) {
+ state->pLoadInfo->numOfCols = nCols;
+ }
hasVal = tMergeTreeNext(&state->mergeTree);
if (TSDBROW_TS(&state->row) <= state->lastTs) {
*pIgnoreEarlierTs = true;
diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
index 101b0291ce..ad93cc567c 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
@@ -332,6 +332,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
// retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
int64_t st = taosGetTimestampUs();
+ int64_t totalLastTs = INT64_MAX;
for (int32_t i = 0; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
@@ -350,7 +351,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
{
bool hasNotNullRow = true;
- int64_t minTs = INT64_MAX;
+ int64_t singleTableLastTs = INT64_MAX;
for (int32_t k = 0; k < pr->numOfCols; ++k) {
int32_t slotId = slotIds[k];
@@ -361,7 +362,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
hasRes = true;
p->ts = pCol->ts;
p->colVal = pCol->colVal;
- minTs = pCol->ts;
+ singleTableLastTs = pCol->ts;
// only set value for last row query
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
@@ -386,8 +387,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
hasRes = true;
p->ts = pColVal->ts;
- if (pColVal->ts < minTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
- minTs = pColVal->ts;
+ if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
+ singleTableLastTs = pColVal->ts;
}
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
@@ -407,9 +408,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
if (hasNotNullRow) {
+ if (INT64_MAX == totalLastTs || (INT64_MAX != singleTableLastTs && totalLastTs < singleTableLastTs)) {
+ totalLastTs = singleTableLastTs;
+ }
double cost = (taosGetTimestampUs() - st) / 1000.0;
if (cost > tsCacheLazyLoadThreshold) {
- pr->lastTs = minTs;
+ pr->lastTs = totalLastTs;
}
}
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index 5c7c63f59f..e5f3c7406d 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -427,7 +427,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
return pTableMap;
}
-static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
+static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts, int32_t step) {
STableBlockScanInfo** p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
@@ -446,6 +446,7 @@ static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->lastKey = ts;
+ pInfo->lastKeyInStt = ts + step;
}
}
@@ -2471,7 +2472,6 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
initMemDataIterator(pScanInfo, pReader);
pLBlockReader->uid = pScanInfo->uid;
- int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pScanInfo->lastKeyInStt;
@@ -4457,8 +4457,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pReader->step == EXTERNAL_ROWS_PREV) {
// prepare for the main scan
- int32_t code = doOpenReaderImpl(pReader);
- resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
+ code = doOpenReaderImpl(pReader);
+ int32_t step = 1;
+ resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey, step);
if (code != TSDB_CODE_SUCCESS) {
return code;
@@ -4479,8 +4480,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
// prepare for the next row scan
- int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
- resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
+ int32_t step = -1;
+ code = doOpenReaderImpl(pReader->innerReader[1]);
+ resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey, step);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -4683,15 +4685,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
}
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
- tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
-
+ tsdbDebug("tsdb reader reset return %p, %s", pReader->pReadSnap, pReader->idStr);
tsdbReleaseReader(pReader);
-
return TSDB_CODE_SUCCESS;
}
SReaderStatus* pStatus = &pReader->status;
-
SDataBlockIter* pBlockIter = &pStatus->blockIter;
pReader->order = pCond->order;
@@ -4712,8 +4711,10 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
resetDataBlockIterator(pBlockIter, pReader->order);
resetTableListIndex(&pReader->status);
- int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
- resetAllDataBlockScanInfo(pStatus->pTableMap, ts);
+ bool asc = ASCENDING_TRAVERSE(pReader->order);
+ int32_t step = asc? 1:-1;
+ int64_t ts = asc? pReader->window.skey - 1 : pReader->window.ekey + 1;
+ resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
int32_t code = 0;
@@ -4728,7 +4729,6 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
tsdbReleaseReader(pReader);
-
return code;
}
}
@@ -4995,3 +4995,9 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti
}
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
}
+
+// if failed, do nothing
+void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
+ taosMemoryFreeClear(pReader->idStr);
+ pReader->idStr = taosStrdup(idstr);
+}
\ No newline at end of file
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index c3177183bb..84602917f2 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -115,6 +115,7 @@ typedef struct STaskIdInfo {
uint64_t subplanId;
uint64_t templateId;
char* str;
+ int32_t vgId;
} STaskIdInfo;
enum {
@@ -834,8 +835,10 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
-int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
- char* sql, EOPTR_EXEC_MODEL model);
+char* buildTaskId(uint64_t taskId, uint64_t queryId);
+
+int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
+ int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index 7390cc1d26..dce358ab6d 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -17,6 +17,7 @@
#include "functionMgt.h"
#include "index.h"
#include "os.h"
+#include "query.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
@@ -1058,10 +1059,10 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
SIndexMetaArg metaArg = {
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = pIndex, .suid = pScanNode->uid};
- SIdxFltStatus status = SFLT_NOT_INDEX;
+ status = SFLT_NOT_INDEX;
code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status);
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
- // qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
+ qWarn("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
code = TDB_CODE_SUCCESS;
} else {
qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index fa576329a6..61ad44f1e6 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -159,6 +159,30 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
}
+void doSetTaskId(SOperatorInfo* pOperator) {
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
+ SStreamScanInfo* pStreamScanInfo = pOperator->info;
+ STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
+ if (pScanInfo->base.dataReader != NULL) {
+ tsdbReaderSetId(pScanInfo->base.dataReader, pTaskInfo->id.str);
+ }
+ } else {
+ doSetTaskId(pOperator->pDownstream[0]);
+ }
+}
+
+void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
+ SExecTaskInfo* pTaskInfo = tinfo;
+ pTaskInfo->id.queryId = queryId;
+
+ taosMemoryFreeClear(pTaskInfo->id.str);
+ pTaskInfo->id.str = buildTaskId(taskId, queryId);
+
+ // set the idstr for tsdbReader
+ doSetTaskId(pTaskInfo->pRoot);
+}
+
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR;
@@ -218,7 +242,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
return code;
}
-qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
+qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) {
if (msg == NULL) {
// create raw scan
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
@@ -231,7 +255,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
pTaskInfo->cost.created = taosGetTimestampUs();
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
- pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
+ pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
if (NULL == pTaskInfo->pRoot) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pTaskInfo);
@@ -248,7 +272,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
}
qTaskInfo_t pTaskInfo = NULL;
- code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
+ code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo);
@@ -274,13 +298,11 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
return pTaskInfo;
}
-qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
+qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) {
if (msg == NULL) {
return NULL;
}
- /*qDebugL("stream task string %s", (const char*)msg);*/
-
struct SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
@@ -289,7 +311,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
}
qTaskInfo_t pTaskInfo = NULL;
- code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
+ code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo);
@@ -468,11 +490,11 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
taosThreadOnce(&initPoolOnce, initRefPool);
- qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
+ qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
- int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
+ int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
if (code != TSDB_CODE_SUCCESS) {
- qError("failed to createExecTaskInfoImpl, code: %s", tstrerror(code));
+ qError("failed to createExecTaskInfo, code: %s", tstrerror(code));
goto _error;
}
@@ -1093,7 +1115,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
- qDebug("switch to next table %" PRId64 " ts %" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
+ qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0;
bool found = false;
@@ -1132,7 +1154,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pTableScanInfo->base.cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0;
- qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
+ qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
ts, pTableScanInfo->currentTable, numOfTables);
} else {
qError("invalid pOffset->type:%d", pOffset->type);
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 47d32f6d61..127e6ddabe 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -1959,7 +1959,7 @@ void destroyAggOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
-static char* buildTaskId(uint64_t taskId, uint64_t queryId) {
+char* buildTaskId(uint64_t taskId, uint64_t queryId) {
char* p = taosMemoryMalloc(64);
int32_t offset = 6;
@@ -1971,11 +1971,10 @@ static char* buildTaskId(uint64_t taskId, uint64_t queryId) {
offset += tintToHex(queryId, &p[offset]);
p[offset] = 0;
-
return p;
}
-static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
+static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (pTaskInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@@ -1990,6 +1989,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
+ pTaskInfo->id.vgId = vgId;
pTaskInfo->id.queryId = queryId;
pTaskInfo->id.str = buildTaskId(taskId, queryId);
return pTaskInfo;
@@ -2178,7 +2178,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
#ifndef NDEBUG
int32_t sz = tableListGetSize(pTableListInfo);
- qDebug("create stream task, total:%d", sz);
+ qDebug("vgId:%d create stream task, total qualified tables:%d, %s", pTaskInfo->id.vgId, sz, idstr);
for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
@@ -2439,17 +2439,14 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
return TSDB_CODE_SUCCESS;
}
-int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
- char* sql, EOPTR_EXEC_MODEL model) {
- uint64_t queryId = pPlan->id.queryId;
-
- *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
+int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
+ int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
+ *pTaskInfo = doCreateExecTaskInfo(pPlan->id.queryId, taskId, vgId, model, pPlan->dbFName);
if (*pTaskInfo == NULL) {
goto _complete;
}
if (pHandle) {
- /*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
if (pHandle->pStateBackend) {
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
}
diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c
index 20766b38d6..24f42ff178 100644
--- a/source/libs/executor/src/sysscanoperator.c
+++ b/source/libs/executor/src/sysscanoperator.c
@@ -562,17 +562,15 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
continue;
}
- if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
+ sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
+
+ if (numOfRows >= pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
- metaTbCursorPrev(pInfo->pCur);
-
if (pInfo->pRes->info.rows > 0) {
break;
}
- } else {
- sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
}
}
diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c
index 0d13db2f35..ec5fc5ad2a 100644
--- a/source/libs/index/src/indexFilter.c
+++ b/source/libs/index/src/indexFilter.c
@@ -634,7 +634,7 @@ static FORCE_INLINE int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxF
}
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
- int32_t code = 0;
+ int32_t code = -1;
if (sifValidOp(node->opType) < 0) {
code = TSDB_CODE_QRY_INVALID_INPUT;
ctx->code = code;
@@ -654,7 +654,7 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
SIFParam *params = NULL;
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
- if (params[0].status == SFLT_NOT_INDEX && (nParam > 1 && params[1].status == SFLT_NOT_INDEX)) {
+ if (params[0].status == SFLT_NOT_INDEX || (nParam > 1 && params[1].status == SFLT_NOT_INDEX)) {
output->status = SFLT_NOT_INDEX;
goto _return;
}
@@ -664,6 +664,7 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
sif_func_t operFn = sifNullFunc;
if (!ctx->noExec) {
+ code = 0;
SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status));
SIF_ERR_JRET(operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output));
} else {
@@ -672,11 +673,17 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
output->status = SFLT_NOT_INDEX;
goto _return;
}
+ code = 0;
SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status));
}
_return:
for (int i = 0; i < nParam; i++) sifFreeParam(¶ms[i]);
taosMemoryFree(params);
+ if (code != 0) {
+ output->status = SFLT_NOT_INDEX;
+ } else {
+ output->status = SFLT_COARSE_INDEX;
+ }
return code;
}
@@ -717,7 +724,7 @@ _return:
static EDealRes sifWalkFunction(SNode *pNode, void *context) {
SFunctionNode *node = (SFunctionNode *)pNode;
- SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
+ SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t)), .status = SFLT_COARSE_INDEX};
SIFCtx *ctx = context;
ctx->code = sifExecFunction(node, ctx, &output);
@@ -735,7 +742,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) {
static EDealRes sifWalkLogic(SNode *pNode, void *context) {
SLogicConditionNode *node = (SLogicConditionNode *)pNode;
- SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
+ SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t)), .status = SFLT_COARSE_INDEX};
SIFCtx *ctx = context;
ctx->code = sifExecLogic(node, ctx, &output);
@@ -831,6 +838,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
if (res->result != NULL) {
taosArrayAddAll(pDst->result, res->result);
}
+ pDst->status = res->status;
sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
@@ -887,16 +895,20 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
SFilterInfo *filter = NULL;
SArray *output = taosArrayInit(8, sizeof(uint64_t));
- SIFParam param = {.arg = *metaArg, .result = output};
+ SIFParam param = {.arg = *metaArg, .result = output, .status = SFLT_NOT_INDEX};
int32_t code = sifCalculate((SNode *)pFilterNode, ¶m);
if (code != 0) {
sifFreeParam(¶m);
return code;
}
+ if (param.status == SFLT_NOT_INDEX) {
+ *status = param.status;
+ } else {
+ *status = st;
+ }
taosArrayAddAll(result, param.result);
sifFreeParam(¶m);
- *status = st;
return TSDB_CODE_SUCCESS;
}
diff --git a/tests/system-test/0-others/check_assert.py b/tests/system-test/0-others/check_assert.py
new file mode 100644
index 0000000000..59fb223528
--- /dev/null
+++ b/tests/system-test/0-others/check_assert.py
@@ -0,0 +1,219 @@
+###################################################################
+# Copyright (c) 2016 by TAOS Technologies, Inc.
+# All rights reserved.
+#
+# This file is proprietary and confidential to TAOS Technologies.
+# No part of this file may be reproduced, stored, transmitted,
+# disclosed or used in any form or by any means other than as
+# expressly provided by the written permission from Jianhui Tao
+#
+###################################################################
+
+# -*- coding: utf-8 -*-
+
+'''
+from util.log import *
+from util.cases import *
+from util.sql import *
+from util.common import *
+from util.sqlset import *
+
+'''
+
+import sys
+import random
+import os
+
+#define code
+NO_FOUND = 0 # not found assert or ASSERT
+FOUND_OK = 1 # found ASSERT and valid usage
+FOUND_NOIF = 2 # found ASSERT but no if like ASSERT(...)
+FOUND_LOWER = 3 # found assert write with lower letters
+FOUND_HAVENOT = 4 # found ASSERT have if but have not like if(!ASSERT)
+
+code_strs = ["not found", "valid", "found but no if", "lower assert","found but have not"]
+
+
+#
+# check assert
+#
+class CheckAssert:
+ def __init__(self):
+ self.files = 0
+ self.total = [0,0,0,0,0]
+
+ def wholeMatched(self, line, pos, n):
+ before = False
+ after = False
+ if pos != -1:
+ if pos == 0 or line[pos-1] == ' ' or line[pos-1] == '!' or line[pos-1] == '(':
+ before = True
+ if n > pos + 6 + 1:
+ last = line[pos+6]
+ if last == ' ' or last == '(':
+ after = True
+
+ if before and after:
+ return True
+ else:
+ return False
+
+
+ def parseLine(self, line):
+
+ # see FOUND_xxx define below
+ code = 0
+
+ n = len(line)
+ if(n < 7):
+ return NO_FOUND
+ s1 = line.find("//") # commit line
+
+ # assert
+ pos = line.find("assert")
+ if pos != -1:
+ if self.wholeMatched(line, pos, n):
+ # comment line check
+ if not (s1 != -1 and s1 < pos):
+ return FOUND_LOWER
+
+ # ASSERT
+ pos = 0
+ while pos < n:
+ pos = line.find("ASSERT", pos)
+ #print(f" pos={pos} {line}")
+ if pos == -1:
+ return NO_FOUND
+ if self.wholeMatched(line, pos, n):
+ break
+ # next
+ pos += 6
+
+ #
+ # found
+ #
+
+ # comment line
+ if s1 != -1 and s1 < pos:
+ return NO_FOUND
+ # check if
+ s2 = line.find("if")
+ if s2 == -1 or s2 > pos or pos - s2 > 5:
+ return FOUND_NOIF
+ s3 = line.find("!")
+ # if(!ASSERT
+ if s3 > s2 and s3 < pos:
+ return FOUND_HAVENOT
+
+ return FOUND_OK
+
+
+ def checkFile(self, pathfile):
+ # check .h .c
+ ext = pathfile[-2:].lower()
+ if ext != ".h" and ext != ".c":
+ return
+
+ print(" check file %s"%pathfile)
+ self.files += 1
+ err = 0
+ ok = 0
+ i = 0
+
+ # read file context
+ with open(pathfile, "r") as fp:
+ lines = fp.readlines()
+ for line in lines:
+ i += 1
+ code = self.parseLine(line)
+ self.total[code] += 1
+ if code == FOUND_OK:
+ ok += 1
+ if code != NO_FOUND and code != FOUND_OK:
+ err += 1
+ if code != NO_FOUND:
+ print(f" line: {i} code: {code} {line}")
+
+ # parse end output total
+ if err > 0 or ok > 0:
+ print(f" found problem: {err} \n")
+
+
+ def scanPath(self, path):
+ #print(" check path %s"%path)
+
+ ignores = ["/test/"]
+ for ignore in ignores:
+ if ignore in path:
+ print(f" ignore {path} keyword: {ignore}")
+ return
+
+ with os.scandir(path) as childs:
+ for child in childs:
+ if child.is_file():
+ self.checkFile(os.path.join(path, child.name))
+ elif child.is_dir():
+ self.scanPath(child.path)
+
+
+ def doCheck(self, path):
+ print(f" start check path:{path}")
+ self.scanPath(path)
+
+ # print total
+ print("\n")
+ print(f" --------------- total ({self.files} files)--------------")
+ for i in range(5):
+ print(f" code : {i} num: {self.total[i]} ({code_strs[i]})")
+ print(" --------------- end ----------------")
+ print(f"\n")
+
+#
+# main function
+#
+
+if __name__ == "__main__":
+ print(" hello, welcome to use check assert tools 1.0.")
+ path = os.path.dirname(os.path.realpath(__file__))
+ label = "TDengine"
+ pos = path.find(label)
+ if pos != -1:
+ pos += len(label) + 1
+ src = path[0:pos] + "source"
+ else:
+ src = path
+
+ checker = CheckAssert()
+ checker.doCheck(src)
+ print(" check assert finished")
+
+
+
+'''
+class TDTestCase:
+ def init(self, conn, logSql, replicaVar=1):
+ tdLog.debug("start to execute %s" % __file__)
+ tdSql.init(conn.cursor())
+ self.checker = CheckAssert()
+
+ # run
+ def run(self):
+ # calc
+ selfPath = os.path.dirname(os.path.realpath(__file__))
+ projPath = ""
+ if ("community" in selfPath):
+ projPath = selfPath[:selfPath.find("community")]
+ else:
+ projPath = selfPath[:selfPath.find("tests")]
+
+ src = self.projPath + "src/"
+ self.checker.checkAssert(src)
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success("%s successfully executed" % __file__)
+
+tdCases.addWindows(__file__, TDTestCase())
+tdCases.addLinux(__file__, TDTestCase())
+
+'''
\ No newline at end of file
diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py
index 21d307ce37..03aadf8746 100644
--- a/tests/system-test/0-others/compatibility.py
+++ b/tests/system-test/0-others/compatibility.py
@@ -97,7 +97,7 @@ class TDTestCase:
def buildTaosd(self,bPath):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
- os.system(f" cd {bPath} && make install ")
+ os.system(f" cd {bPath} ")
def run(self):
@@ -142,6 +142,10 @@ class TDTestCase:
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
+ cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
+ if os.system(cmd) == 0:
+ raise Exception("failed to execute system command. cmd: %s" % cmd)
+
os.system("pkill taosd") # make sure all the data are saved in disk.
self.checkProcessPid("taosd")
@@ -152,8 +156,10 @@ class TDTestCase:
sleep(1)
tdsql=tdCom.newTdSql()
print(tdsql)
-
-
+ cmd = f" LD_LIBRARY_PATH=/usr/lib taos -h localhost ;"
+ if os.system(cmd) == 0:
+ raise Exception("failed to execute system command. cmd: %s" % cmd)
+
tdsql.query(f"SELECT SERVER_VERSION();")
nowServerVersion=tdsql.queryResult[0][0]
tdLog.info(f"New server version is {nowServerVersion}")
diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py
index 23ddb12d79..720eab74c4 100644
--- a/tests/system-test/0-others/information_schema.py
+++ b/tests/system-test/0-others/information_schema.py
@@ -101,18 +101,10 @@ class TDTestCase:
tdSql.checkEqual(i[1],len(self.perf_list))
elif i[0].lower() == self.dbname:
tdSql.checkEqual(i[1],self.tbnum+1)
- def ins_columns_check(self):
- tdSql.execute('create database db2 vgroups 2 replica 1')
- tdSql.execute('create table db2.stb2 (ts timestamp,c0 int,c1 int, c2 double, c3 float, c4 binary(1000), c5 nchar(100),c7 bigint, c8 bool, c9 smallint) tags(t0 int)')
- for i in range(2000):
- tdSql.execute("create table db2.ctb%d using db2.stb2 tags(%d)" %(i,i))
- tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type="CHILD_TABLE"')
- tdSql.checkEqual(20000,len(tdSql.queryResult))
- print("number of ins_columns of child table in db2 is %s" % len(tdSql.queryResult))
def run(self):
self.prepare_data()
self.count_check()
- self.ins_columns_check()
+
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
diff --git a/tests/system-test/0-others/tag_index_basic.py b/tests/system-test/0-others/tag_index_basic.py
index 96c3dca016..72ed559ffd 100644
--- a/tests/system-test/0-others/tag_index_basic.py
+++ b/tests/system-test/0-others/tag_index_basic.py
@@ -24,7 +24,7 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
- tdSql.init(conn.cursor())
+ tdSql.init(conn.cursor(), True)
self.setsql = TDSetSql()
self.column_dict = {
'ts': 'timestamp',
diff --git a/tests/system-test/2-query/tsbsQuery.py b/tests/system-test/2-query/tsbsQuery.py
index 4f415550b8..e64b3ed0c7 100644
--- a/tests/system-test/2-query/tsbsQuery.py
+++ b/tests/system-test/2-query/tsbsQuery.py
@@ -211,8 +211,9 @@ class TDTestCase:
tdSql.query(f"select model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs from {dbname}.diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;")
- sql=f"select model,ctc from (SELECT model,count(state_changed) as ctc FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs from {dbname}.diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 ) WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY model interval(10m)) partition BY model) WHERE state_changed = 1 partition BY model )where model is null;"
-
+ sql=f"SELECT model,count(state_changed) FROM (SELECT _rowts,model,diff(broken_down) AS state_changed FROM (SELECT ts,model,tb,cast(cast(floor(2*(nzs)) as bool) as int) AS broken_down FROM (SELECT _wstart as ts,model,tbname as tb, sum(cast(cast(status as bool) as int))/count(cast(cast(status as bool) as int)) AS nzs FROM {dbname}.diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY tbname,model interval(10m))order by ts) partition BY tb,model ) WHERE state_changed = 1 partition BY model;"
+ tdSql.query(f"{sql}")
+ tdSql.checkRows(46)
# for i in range(2):
# tdSql.query("%s"%sql)
# quertR1=tdSql.queryResult
diff --git a/tools/shell/inc/shellInt.h b/tools/shell/inc/shellInt.h
index 08d4b167ea..6345647e2f 100644
--- a/tools/shell/inc/shellInt.h
+++ b/tools/shell/inc/shellInt.h
@@ -28,6 +28,10 @@
#ifdef WEBSOCKET
#include "taosws.h"
+
+#define SHELL_WS_TIMEOUT 30
+#define SHELL_WS_DSN_BUFF 256
+#define SHELL_WS_DSN_MASK 10
#endif
#define SHELL_MAX_HISTORY_SIZE 1000
@@ -99,7 +103,7 @@ typedef struct {
bool exit;
#ifdef WEBSOCKET
WS_TAOS* ws_conn;
- bool stop_query;
+ bool stop_query;
#endif
} SShellObj;
@@ -139,7 +143,7 @@ void shellExit();
void shellTestNetWork();
#ifdef WEBSOCKET
-void shellCheckConnectMode();
+void shellCheckConnectMode();
// shellWebsocket.c
int shell_conn_ws_server(bool first);
int32_t shell_run_websocket();
diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c
index 636138fac7..c16769ccba 100644
--- a/tools/shell/src/shellArguments.c
+++ b/tools/shell/src/shellArguments.c
@@ -60,7 +60,7 @@
#ifdef WEBSOCKET
#define SHELL_DSN "The dsn to use when connecting to cloud server."
#define SHELL_REST "Use restful mode when connecting."
-#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 10."
+#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 30."
#endif
static int32_t shellParseSingleOpt(int32_t key, char *arg);
@@ -127,7 +127,7 @@ static struct argp_option shellOptions[] = {
#ifdef WEBSOCKET
{"dsn", 'E', "DSN", 0, SHELL_DSN},
{"restful", 'R', 0, 0, SHELL_REST},
- {"timeout", 'T', "SECONDS", 0, SHELL_TIMEOUT},
+ {"timeout", 'T', "SECONDS", 0, SHELL_TIMEOUT},
#endif
{"pktnum", 'N', "PKTNUM", 0, SHELL_PKT_NUM},
{0},
@@ -223,9 +223,9 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
pArgs->dsn = arg;
pArgs->cloud = true;
break;
- case 'T':
- pArgs->timeout = atoi(arg);
- break;
+ case 'T':
+ pArgs->timeout = atoi(arg);
+ break;
#endif
case 'V':
pArgs->is_version = true;
@@ -246,7 +246,8 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
SShellArgs *pArgs = &shell.args;
for (int i = 1; i < argc; i++) {
- if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 || strcmp(argv[i], "-?") == 0 || strcmp(argv[i], "/?") == 0) {
+ if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0
+ || strcmp(argv[i], "-?") == 0 || strcmp(argv[i], "/?") == 0) {
shellParseSingleOpt('?', NULL);
return 0;
}
@@ -262,12 +263,14 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
return -1;
}
- if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' || key[1] == 'a' || key[1] == 'c' || key[1] == 's' ||
- key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N'
+ if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u'
+ || key[1] == 'a' || key[1] == 'c' || key[1] == 's'
+ || key[1] == 'f' || key[1] == 'd' || key[1] == 'w'
+ || key[1] == 'n' || key[1] == 'l' || key[1] == 'N'
#ifdef WEBSOCKET
- || key[1] == 'E' || key[1] == 'T'
+ || key[1] == 'E' || key[1] == 'T'
#endif
- ) {
+ ) {
if (i + 1 >= argc) {
fprintf(stderr, "option %s requires an argument\r\n", key);
return -1;
@@ -279,12 +282,14 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
}
shellParseSingleOpt(key[1], val);
i++;
- } else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' || key[1] == 'r' || key[1] == 'k' ||
- key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1
+ } else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C'
+ || key[1] == 'r' || key[1] == 'k'
+ || key[1] == 't' || key[1] == 'V'
+ || key[1] == '?' || key[1] == 1
#ifdef WEBSOCKET
- ||key[1] == 'R'
+ ||key[1] == 'R'
#endif
- ) {
+ ) {
shellParseSingleOpt(key[1], NULL);
} else {
fprintf(stderr, "invalid option %s\r\n", key);
@@ -406,7 +411,7 @@ static int32_t shellCheckArgs() {
int32_t shellParseArgs(int32_t argc, char *argv[]) {
shellInitArgs(argc, argv);
- shell.info.clientVersion =
+ shell.info.clientVersion =
"Welcome to the %s Command Line Interface, Client Version:%s\r\n"
"Copyright (c) 2022 by %s, all rights reserved.\r\n\r\n";
strcpy(shell.info.cusName, cusName);
diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c
index 05183a036c..3080b15b8c 100644
--- a/tools/shell/src/shellEngine.c
+++ b/tools/shell/src/shellEngine.c
@@ -102,10 +102,10 @@ int32_t shellRunSingleCommand(char *command) {
}
#ifdef WEBSOCKET
if (shell.args.restful || shell.args.cloud) {
- shellRunSingleCommandWebsocketImp(command);
+ shellRunSingleCommandWebsocketImp(command);
} else {
#endif
- shellRunSingleCommandImp(command);
+ shellRunSingleCommandImp(command);
#ifdef WEBSOCKET
}
#endif
@@ -1025,15 +1025,15 @@ void *shellCancelHandler(void *arg) {
}
#ifdef WEBSOCKET
- if (shell.args.restful || shell.args.cloud) {
- shell.stop_query = true;
- } else {
+ if (shell.args.restful || shell.args.cloud) {
+ shell.stop_query = true;
+ } else {
#endif
- if (shell.conn) {
- taos_kill_query(shell.conn);
- }
+ if (shell.conn) {
+ taos_kill_query(shell.conn);
+ }
#ifdef WEBSOCKET
- }
+ }
#endif
#ifdef WINDOWS
printf("\n%s", shell.info.promptHeader);
@@ -1083,21 +1083,21 @@ int32_t shellExecute() {
SShellArgs *pArgs = &shell.args;
#ifdef WEBSOCKET
if (shell.args.restful || shell.args.cloud) {
- if (shell_conn_ws_server(1)) {
- return -1;
- }
+ if (shell_conn_ws_server(1)) {
+ return -1;
+ }
} else {
#endif
- if (shell.args.auth == NULL) {
- shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port);
- } else {
- shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port);
- }
+ if (shell.args.auth == NULL) {
+ shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port);
+ } else {
+ shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port);
+ }
- if (shell.conn == NULL) {
- fflush(stdout);
- return -1;
- }
+ if (shell.conn == NULL) {
+ fflush(stdout);
+ return -1;
+ }
#ifdef WEBSOCKET
}
#endif
@@ -1118,13 +1118,13 @@ int32_t shellExecute() {
shellSourceFile(pArgs->file);
}
#ifdef WEBSOCKET
- if (shell.args.restful || shell.args.cloud) {
- ws_close(shell.ws_conn);
- } else {
+ if (shell.args.restful || shell.args.cloud) {
+ ws_close(shell.ws_conn);
+ } else {
#endif
- taos_close(shell.conn);
+ taos_close(shell.conn);
#ifdef WEBSOCKET
- }
+ }
#endif
shellWriteHistory();
@@ -1148,9 +1148,9 @@ int32_t shellExecute() {
if (!shell.args.restful && !shell.args.cloud) {
#endif
#ifndef WINDOWS
- printfIntroduction();
+ printfIntroduction();
#endif
- shellGetGrantInfo();
+ shellGetGrantInfo();
#ifdef WEBSOCKET
}
#endif
diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c
index 22b8e89959..bc5809ffe8 100644
--- a/tools/shell/src/shellMain.c
+++ b/tools/shell/src/shellMain.c
@@ -45,7 +45,7 @@ void shellCrashHandler(int signum, void *sigInfo, void *context) {
int main(int argc, char *argv[]) {
shell.exit = false;
#ifdef WEBSOCKET
- shell.args.timeout = 10;
+ shell.args.timeout = SHELL_WS_TIMEOUT;
shell.args.cloud = true;
#endif
diff --git a/tools/shell/src/shellWebsocket.c b/tools/shell/src/shellWebsocket.c
index 81c23035e2..1d81ce4b2f 100644
--- a/tools/shell/src/shellWebsocket.c
+++ b/tools/shell/src/shellWebsocket.c
@@ -14,22 +14,51 @@
* along with this program. If not, see .
*/
#ifdef WEBSOCKET
-#include "taosws.h"
-#include "shellInt.h"
+#include
+#include
int shell_conn_ws_server(bool first) {
- shell.ws_conn = ws_connect_with_dsn(shell.args.dsn);
- if (!shell.ws_conn) {
- fprintf(stderr, "failed to connect %s, reason: %s\n",
- shell.args.dsn, ws_errstr(NULL));
+ char cuttedDsn[SHELL_WS_DSN_BUFF] = {0};
+ int dsnLen = strlen(shell.args.dsn);
+ snprintf(cuttedDsn,
+ ((dsnLen-SHELL_WS_DSN_MASK) > SHELL_WS_DSN_BUFF)?
+ SHELL_WS_DSN_BUFF:(dsnLen-SHELL_WS_DSN_MASK),
+ "%s", shell.args.dsn);
+ fprintf(stdout, "trying to connect %s*** ", cuttedDsn);
+ fflush(stdout);
+ for (int i = 0; i < shell.args.timeout; i++) {
+ shell.ws_conn = ws_connect_with_dsn(shell.args.dsn);
+ if (NULL == shell.ws_conn) {
+ int errNo = ws_errno(NULL);
+ if (0xE001 == errNo) {
+ fprintf(stdout, ".");
+ fflush(stdout);
+ taosMsleep(1000); // sleep 1 second then try again
+ continue;
+ } else {
+ fprintf(stderr, "\nfailed to connect %s***, reason: %s\n",
+ cuttedDsn, ws_errstr(NULL));
+ return -1;
+ }
+ } else {
+ break;
+ }
+ }
+ if (NULL == shell.ws_conn) {
+ fprintf(stdout, "\n timeout\n");
+ fprintf(stderr, "\nfailed to connect %s***, reason: %s\n",
+ cuttedDsn, ws_errstr(NULL));
return -1;
+ } else {
+ fprintf(stdout, "\n");
}
if (first && shell.args.restful) {
- fprintf(stdout, "successfully connect to %s\n\n",
+ fprintf(stdout, "successfully connected to %s\n\n",
shell.args.dsn);
} else if (first && shell.args.cloud) {
- fprintf(stdout, "successfully connect to cloud service\n");
+ fprintf(stdout, "successfully connected to cloud service\n");
}
+ fflush(stdout);
return 0;
}
@@ -118,7 +147,8 @@ static int verticalPrintWebsocket(WS_RES* wres, double* pexecute_time) {
return numOfRows;
}
-static int dumpWebsocketToFile(const char* fname, WS_RES* wres, double* pexecute_time) {
+static int dumpWebsocketToFile(const char* fname, WS_RES* wres,
+ double* pexecute_time) {
char fullname[PATH_MAX] = {0};
if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
tstrncpy(fullname, fname, PATH_MAX);
@@ -150,7 +180,7 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres, double* pexecute
}
taosFprintfFile(pFile, "%s", fields[col].name);
}
- taosFprintfFile(pFile, "\r\n");
+ taosFprintfFile(pFile, "\r\n");
do {
uint8_t ty;
uint32_t len;
@@ -161,7 +191,8 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres, double* pexecute
taosFprintfFile(pFile, ",");
}
const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
- shellDumpFieldToFile(pFile, (const char*)value, fields + j, len, precision);
+ shellDumpFieldToFile(pFile, (const char*)value,
+ fields + j, len, precision);
}
taosFprintfFile(pFile, "\r\n");
}
@@ -171,7 +202,9 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres, double* pexecute
return numOfRows;
}
-static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical, double* pexecute_time) {
+static int shellDumpWebsocket(WS_RES *wres, char *fname,
+ int *error_no, bool vertical,
+ double* pexecute_time) {
int numOfRows = 0;
if (fname != NULL) {
numOfRows = dumpWebsocketToFile(fname, wres, pexecute_time);
@@ -227,13 +260,16 @@ void shellRunSingleCommandWebsocketImp(char *command) {
// if it's not a ws connection error
if (TSDB_CODE_WS_DSN_ERROR != (code&TSDB_CODE_WS_DSN_ERROR)) {
et = taosGetTimestampUs();
- fprintf(stderr, "\nDB: error: %s (%.6fs)\n", ws_errstr(res), (et - st)/1E6);
+ fprintf(stderr, "\nDB: error: %s (%.6fs)\n",
+ ws_errstr(res), (et - st)/1E6);
ws_free_result(res);
return;
}
- if (code == TSDB_CODE_WS_SEND_TIMEOUT || code == TSDB_CODE_WS_RECV_TIMEOUT) {
+ if (code == TSDB_CODE_WS_SEND_TIMEOUT
+ || code == TSDB_CODE_WS_RECV_TIMEOUT) {
fprintf(stderr, "Hint: use -t to increase the timeout in seconds\n");
- } else if (code == TSDB_CODE_WS_INTERNAL_ERRO || code == TSDB_CODE_WS_CLOSED) {
+ } else if (code == TSDB_CODE_WS_INTERNAL_ERRO
+ || code == TSDB_CODE_WS_CLOSED) {
shell.ws_conn = NULL;
}
ws_free_result(res);
@@ -252,7 +288,8 @@ void shellRunSingleCommandWebsocketImp(char *command) {
execute_time = ws_take_timing(res)/1E6;
}
- if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
+ if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$",
+ REG_EXTENDED | REG_ICASE)) {
fprintf(stdout, "Database changed.\r\n\r\n");
fflush(stdout);
ws_free_result(res);
@@ -266,10 +303,12 @@ void shellRunSingleCommandWebsocketImp(char *command) {
double total_time = (et - st)/1E3;
double net_time = total_time - (double)execute_time;
printf("Query Ok, %d of %d row(s) in database\n", numOfRows, numOfRows);
- printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time);
+ printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
+ execute_time, net_time, total_time);
} else {
int error_no = 0;
- numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode, &execute_time);
+ numOfRows = shellDumpWebsocket(res, fname, &error_no,
+ printMode, &execute_time);
if (numOfRows < 0) {
ws_free_result(res);
return;
@@ -279,11 +318,13 @@ void shellRunSingleCommandWebsocketImp(char *command) {
double net_time = total_time - execute_time;
if (error_no == 0 && !shell.stop_query) {
printf("Query OK, %d row(s) in set\n", numOfRows);
- printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time);
+ printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
+ execute_time, net_time, total_time);
} else {
printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows,
(et - st)/1E6);
- printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time);
+ printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
+ execute_time, net_time, total_time);
}
}
printf("\n");
diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c
index 3d2c08149b..873946121b 100644
--- a/utils/test/c/sml_test.c
+++ b/utils/test/c/sml_test.c
@@ -245,8 +245,7 @@ int smlProcess_json3_Test() {
taos_free_result(pRes);
const char *sql[] = {
- "[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\","
- "\"dc\":\"lga\"}}]"};
+ "[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\",\"dc\":\"lga\"}}]"};
char *sql1[1] = {0};
for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024);
diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c
index ce61b80d41..cb7b501298 100644
--- a/utils/test/c/tmqSim.c
+++ b/utils/test/c/tmqSim.c
@@ -858,7 +858,9 @@ void loop_consume(SThreadInfo* pInfo) {
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
- taosFsyncFile(pInfo->pConsumeRowsFile);
+ if(taosFsyncFile(pInfo->pConsumeRowsFile) < 0){
+ printf("taosFsyncFile error:%s", strerror(errno));
+ }
taosCloseFile(&pInfo->pConsumeRowsFile);
}