Merge remote-tracking branch 'origin/enh/TS-2967' into fix/cencVer

This commit is contained in:
dapan1121 2023-03-28 17:47:41 +08:00
commit def8c9044e
36 changed files with 914 additions and 373 deletions

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG d11f210 GIT_TAG 04296a5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -241,6 +241,7 @@ int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId); bool fmIsInvertible(int32_t funcId);
char* fmGetFuncName(int32_t funcId);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -183,7 +183,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
} }
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
end: end:
cJSON_Delete(json); cJSON_Delete(json);
tFreeSMAltertbReq(&req); tFreeSMAltertbReq(&req);
return string; return string;
@ -205,7 +205,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
} }
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
uDebug("processCreateStb %s", string); uDebug("processCreateStb %s", string);
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return string;
} }
@ -227,7 +227,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
uDebug("processAlterStb %s", string); uDebug("processAlterStb %s", string);
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return string;
} }
@ -309,7 +309,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
cJSON_AddItemToArray(tags, tag); cJSON_AddItemToArray(tags, tag);
} }
end: end:
cJSON_AddItemToObject(json, "tags", tags); cJSON_AddItemToObject(json, "tags", tags);
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
} }
@ -368,7 +368,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
uDebug("processCreateTable :%s", string); uDebug("processCreateTable :%s", string);
} }
_exit: _exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment); taosMemoryFreeClear(pCreateReq->comment);
@ -408,7 +408,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
} }
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
uDebug("processAutoCreateTable :%s", string); uDebug("processAutoCreateTable :%s", string);
_exit: _exit:
for (int i = 0; i < rsp->createTableNum; i++) { for (int i = 0; i < rsp->createTableNum; i++) {
tDecoderClear(&decoder[i]); tDecoderClear(&decoder[i]);
taosMemoryFreeClear(pCreateReq[i].comment); taosMemoryFreeClear(pCreateReq[i].comment);
@ -535,7 +535,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processAlterTable :%s", string); uDebug("processAlterTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
@ -569,7 +569,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processDropSTable :%s", string); uDebug("processDropSTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
@ -609,7 +609,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processDeleteTable :%s", string); uDebug("processDeleteTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return string;
@ -652,7 +652,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processDropTable :%s", string); uDebug("processDropTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
@ -742,7 +742,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
end: end:
destroyRequest(pRequest); destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq); tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder); tDecoderClear(&coder);
@ -839,7 +839,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
end: end:
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
return code; return code;
@ -901,9 +901,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table // loop to create table
@ -987,7 +987,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
end: end:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment); taosMemoryFreeClear(pCreateReq->comment);
@ -1058,9 +1058,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table // loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
@ -1132,7 +1132,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
} }
code = pRequest->code; code = pRequest->code;
end: end:
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
@ -1201,7 +1201,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
} }
taos_free_result(res); taos_free_result(res);
end: end:
tDecoderClear(&coder); tDecoderClear(&coder);
return code; return code;
} }
@ -1249,9 +1249,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
} }
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName = {0}; SName pName = {0};
@ -1311,7 +1311,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
code = handleAlterTbExecRes(pRes->res, pCatalog); code = handleAlterTbExecRes(pRes->res, pCatalog);
} }
} }
end: end:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
if (pVgData) taosMemoryFreeClear(pVgData->pData); if (pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData); taosMemoryFreeClear(pVgData);
@ -1399,7 +1399,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
destroyRequest(pRequest); destroyRequest(pRequest);
@ -1481,7 +1481,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
destroyRequest(pRequest); destroyRequest(pRequest);
@ -1601,6 +1601,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
uError("WriteRaw:rawBlockBindData failed"); uError("WriteRaw:rawBlockBindData failed");
goto end; goto end;
} }
taosMemoryFreeClear(pTableMeta);
} }
code = smlBuildOutput(pQuery, pVgHash); code = smlBuildOutput(pQuery, pVgHash);
@ -1612,7 +1613,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
tDeleteSMqDataRsp(&rspObj.rsp); tDeleteSMqDataRsp(&rspObj.rsp);
tDecoderClear(&decoder); tDecoderClear(&decoder);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
@ -1707,6 +1708,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
uError("WriteRaw: tDecodeSVCreateTbReq error"); uError("WriteRaw: tDecodeSVCreateTbReq error");
code = TSDB_CODE_TMQ_INVALID_MSG; code = TSDB_CODE_TMQ_INVALID_MSG;
goto end; goto end;
@ -1715,15 +1717,19 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
if (pCreateReq.type != TSDB_CHILD_TABLE) { if (pCreateReq.type != TSDB_CHILD_TABLE) {
uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName); uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName);
code = TSDB_CODE_TSC_INVALID_VALUE; code = TSDB_CODE_TSC_INVALID_VALUE;
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
goto end; goto end;
} }
if (strcmp(tbName, pCreateReq.name) == 0) { if (strcmp(tbName, pCreateReq.name) == 0) {
cloneSVreateTbReq(&pCreateReq, &pCreateReqDst); cloneSVreateTbReq(&pCreateReq, &pCreateReqDst);
// pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb); // pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb);
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
break; break;
} }
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
} }
SVgroupInfo vg; SVgroupInfo vg;
@ -1774,6 +1780,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end; goto end;
} }
pCreateReqDst = NULL; pCreateReqDst = NULL;
taosMemoryFreeClear(pTableMeta);
} }
code = smlBuildOutput(pQuery, pVgHash); code = smlBuildOutput(pQuery, pVgHash);
@ -1785,7 +1792,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
tDeleteSTaosxRsp(&rspObj.rsp); tDeleteSTaosxRsp(&rspObj.rsp);
tDecoderClear(&decoder); tDecoderClear(&decoder);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);

View File

@ -1116,6 +1116,7 @@ _failed:
} }
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container; const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container); int32_t sz = taosArrayGetSize(container);
void* buf = NULL; void* buf = NULL;
@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0; int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 40) { if (retryCnt++ > MAX_RETRY_COUNT) {
goto FAIL; goto FAIL;
} }
@ -1811,7 +1812,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&pRspWrapper); taosGetQitem(tmq->qall, (void**)&pRspWrapper);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
return NULL; return NULL;
} }
@ -1831,7 +1831,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
if (pDataRsp->head.epoch == consumerEpoch) { if (pDataRsp->head.epoch == consumerEpoch) {
// todo fix it: race condition
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
// update the epset // update the epset
@ -1843,6 +1842,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
// update the local offset value only for the returned values.
pVg->currentOffset = pDataRsp->rspOffset; pVg->currentOffset = pDataRsp->rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);

View File

@ -1180,7 +1180,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
void blockDataEmpty(SSDataBlock* pDataBlock) { void blockDataEmpty(SSDataBlock* pDataBlock) {
SDataBlockInfo* pInfo = &pDataBlock->info; SDataBlockInfo* pInfo = &pDataBlock->info;
if (pInfo->capacity == 0 || pInfo->rows > pDataBlock->info.capacity) { if (pInfo->capacity == 0) {
return; return;
} }

View File

@ -2439,6 +2439,12 @@ _exit:
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data) { char *data) {
int32_t code = 0; int32_t code = 0;
if(data == NULL){
for (int32_t i = 0; i < nRows; ++i) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
}
goto _exit;
}
if (IS_VAR_DATA_TYPE(type)) { // var-length data type if (IS_VAR_DATA_TYPE(type)) { // var-length data type
for (int32_t i = 0; i < nRows; ++i) { for (int32_t i = 0; i < nRows; ++i) {

View File

@ -178,7 +178,7 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr); SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);

View File

@ -109,23 +109,18 @@ typedef struct {
} STqPushEntry; } STqPushEntry;
struct STQ { struct STQ {
SVnode* pVnode; SVnode* pVnode;
char* path; char* path;
int64_t walLogLastVer; int64_t walLogLastVer;
SRWLatch lock;
SRWLatch pushLock; SHashObj* pPushMgr; // consumerId -> STqPushEntry
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pPushMgr; // consumerId -> STqPushEntry SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore; STqOffsetStore* pOffsetStore;
TDB* pMetaDB;
TDB* pMetaDB; TTB* pExecStore;
TTB* pExecStore; TTB* pCheckStore;
TTB* pCheckStore; SStreamMeta* pStreamMeta;
SStreamMeta* pStreamMeta;
}; };
typedef struct { typedef struct {
@ -144,8 +139,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
@ -164,7 +158,7 @@ typedef struct {
int32_t size; int32_t size;
} STqOffsetHead; } STqOffsetHead;
STqOffsetStore* tqOffsetOpen(); STqOffsetStore* tqOffsetOpen(STQ* pTq);
void tqOffsetClose(STqOffsetStore*); void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);

View File

@ -224,6 +224,8 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter); void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum);
// STbData // STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData); int32_t tsdbGetNRowsInTbData(STbData *pTbData);
// tsdbFile.c ============================================================================================== // tsdbFile.c ==============================================================================================

View File

@ -51,7 +51,7 @@ void tqCleanUp() {
} }
} }
static void destroySTqHandle(void* data) { static void destroyTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data; STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task); qDestroyTask(pData->execHandle.task);
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
@ -89,9 +89,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
taosInitRWLatch(&pTq->pushLock); taosInitRWLatch(&pTq->lock);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
@ -236,38 +236,6 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
} }
#endif #endif
// int32_t len = 0;
// int32_t code = 0;
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
// if (code < 0) {
// return -1;
// }
//
// int32_t tlen = sizeof(SMqRspHead) + len;
// void* buf = rpcMallocCont(tlen);
// if (buf == NULL) {
// return -1;
// }
//
// memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
// SEncoder encoder = {0};
// tEncoderInit(&encoder, abuf, len);
// tEncodeSMqDataRsp(&encoder, pRsp);
// tEncoderClear(&encoder);
//
// SRpcMsg rsp = {
// .info = pPushEntry->pInfo,
// .pCont = buf,
// .contLen = tlen,
// .code = 0,
// };
//
// tmsgSendRsp(&rsp);
//
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
@ -444,7 +412,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
char formatBuf[80]; char formatBuf[80];
tFormatOffset(formatBuf, 80, pOffsetVal); tFormatOffset(formatBuf, 80, pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.", tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
consumerId, pHandle->subKey, vgId, formatBuf); consumerId, pHandle->subKey, vgId, formatBuf);
return 0; return 0;
} else { } else {
@ -502,7 +470,45 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0; return 0;
} }
static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
int32_t code = 0;
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->lock);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
taosWUnLockLatch(&pTq->lock);
return code;
}
taosWUnLockLatch(&pTq->lock);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64 ", reqId:0x%" PRIx64,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
dataRsp.rspOffset.ts, pRequest->reqId);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
int32_t code = -1; int32_t code = -1;
STqOffsetVal offset = {0}; STqOffsetVal offset = {0};
SWalCkHead* pCkHead = NULL; SWalCkHead* pCkHead = NULL;
@ -512,9 +518,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
uint64_t consumerId = pRequest->consumerId; uint64_t consumerId = pRequest->consumerId;
// 1. reset the offset if needed // 1. reset the offset if needed
if (reqOffset.type > 0) { if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
offset = reqOffset; // handle the reset offset cases, according to the consumer's choice.
} else { // handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false; bool blockReturned = false;
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) { if (code != 0) {
@ -525,38 +530,14 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
if (blockReturned) { if (blockReturned) {
return 0; return 0;
} }
} else { // use the consumer specified offset
// the offset value can not be monotonious increase??
offset = reqOffset;
} }
// this is a normal subscription requirement // this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0}; return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->pushLock);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, &offset);
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
taosWUnLockLatch(&pTq->pushLock);
return code;
}
taosWUnLockLatch(&pTq->pushLock);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64", reqId:0x%"PRIx64,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
dataRsp.rspOffset.ts, pRequest->reqId);
tDeleteSMqDataRsp(&dataRsp);
return code;
} }
// todo handle the case where re-balance occurs. // todo handle the case where re-balance occurs.
@ -573,7 +554,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
if (metaRsp.metaRspLen > 0) { if (metaRsp.metaRspLen > 0) {
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
",ts:%" PRId64, ",ts:%" PRId64,
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.ts); metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp); taosMemoryFree(metaRsp.metaRsp);
@ -590,14 +571,12 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
} }
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",version:%" PRId64, ",version:%" PRId64,
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version); taosxRsp.rspOffset.version);
} }
if (offset.type == TMQ_OFFSET__LOG) { if (offset.type == TMQ_OFFSET__LOG) {
// if (offset.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = offset.version + 1; int64_t fetchVer = offset.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
@ -605,15 +584,14 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
walSetReaderCapacity(pHandle->pWalReader, 2048); walSetReaderCapacity(pHandle->pWalReader, 2048);
int totalRows = 0;
while (1) { while (1) {
// todo refactor: this is not correct. // todo refactor: this is not correct.
int32_t savedEpoch = atomic_load_32(&pHandle->epoch); int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > pRequest->epoch) { if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d", ", found new consumer epoch %d, discard req epoch %d",
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
break; break;
} }
@ -630,32 +608,16 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
pRequest->epoch, vgId, fetchVer, pHead->msgType); pRequest->epoch, vgId, fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) { // process meta
SPackedData submit = { if (pHead->msgType != TDMT_VND_SUBMIT) {
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), if(totalRows > 0) {
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
pRequest->subKey);
return -1;
}
if (taosxRsp.blockNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead); taosMemoryFreeClear(pCkHead);
return code; return code;
} else {
fetchVer++;
} }
} else {
/*A(pHandle->fetchMeta);*/
/*A(IS_META_MSG(pHead->msgType));*/
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;
@ -672,6 +634,31 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
return code; return code;
} }
// process data
SPackedData submit = {
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
pRequest->subKey);
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return -1;
}
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
} else {
fetchVer++;
}
} }
} }
@ -702,31 +689,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
// 2. check re-balance status // 2. check re-balance status
taosRLockLatch(&pTq->pushLock); taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosRUnLockLatch(&pTq->pushLock); taosRUnLockLatch(&pTq->lock);
return -1; return -1;
} }
taosRUnLockLatch(&pTq->pushLock); taosRUnLockLatch(&pTq->lock);
taosWLockLatch(&pTq->pushLock);
// 3. update the epoch value // 3. update the epoch value
taosWLockLatch(&pTq->lock);
int32_t savedEpoch = pHandle->epoch; int32_t savedEpoch = pHandle->epoch;
if (savedEpoch < reqEpoch) { if (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
pHandle->epoch = reqEpoch; pHandle->epoch = reqEpoch;
} }
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &reqOffset); tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
return extractDataForMq(pTq, pHandle, &req, pMsg); return doPollDataForMq(pTq, pHandle, &req, pMsg);
} }
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
@ -734,12 +721,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->lock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) { if (code != 0) {
tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
} }
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) { if (pHandle) {
@ -803,18 +790,18 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SVnode* pVnode = pTq->pVnode; SVnode* pVnode = pTq->pVnode;
int32_t vgId = TD_VID(pVnode); int32_t vgId = TD_VID(pVnode);
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId); req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
if (req.oldConsumerId != -1) { if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "", tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId); req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
} }
if (req.newConsumerId == -1) { if (req.newConsumerId == -1) {
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return 0; return 0;
} }
@ -904,28 +891,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
atomic_add_fetch_32(&pHandle->epoch, 1); atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return tqMetaSaveHandle(pTq, req.subKey, pHandle); return tqMetaSaveHandle(pTq, req.subKey, pHandle);
} } else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, taosWLockLatch(&pTq->lock);
req.newConsumerId); atomic_store_32(&pHandle->epoch, -1);
taosWLockLatch(&pTq->pushLock); // remove if it has been register in the push manager, and return one empty block to consumer
atomic_store_32(&pHandle->epoch, -1); tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
// remove if it has been register in the push manager, and return one empty block to consumer atomic_store_64(&pHandle->consumerId, req.newConsumerId);
tqRemovePushEntry(pTq, req.subKey, (int32_t) strlen(req.subKey), pHandle->consumerId, true); atomic_add_fetch_32(&pHandle->epoch, 1);
atomic_store_64(&pHandle->consumerId, req.newConsumerId); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
atomic_add_fetch_32(&pHandle->epoch, 1); qStreamCloseTsdbReader(pHandle->execHandle.task);
}
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { taosWUnLockLatch(&pTq->lock);
qStreamCloseTsdbReader(pHandle->execHandle.task); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
} taosMemoryFree(req.qmsg);
return -1;
taosWUnLockLatch(&pTq->pushLock); }
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
taosMemoryFree(req.qmsg);
return -1;
} }
} }

View File

@ -230,23 +230,15 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
return 0; return 0;
} }
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp) { int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) {
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
/*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(void*)); SArray* pSchemas = taosArrayInit(0, sizeof(void*));
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
/*tqReaderSetDataMsg(pReader, pReq, 0);*/
tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlock2(pReader)) { while (tqNextDataBlock2(pReader)) {
/*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
@ -254,7 +246,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
/*int64_t uid = pExec->pExecReader->msgIter.uid;*/
int64_t uid = pExec->pExecReader->lastBlkUid; int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
@ -296,6 +287,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision); pTq->pVnode->config.tsdbCfg.precision);
totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock); blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
@ -304,13 +296,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
/*tqReaderSetDataMsg(pReader, pReq, 0);*/
tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) {
/*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
@ -355,15 +342,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
/*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
/*pTq->pVnode->config.tsdbCfg.precision);*/
/*blockDataFreeRes(&block);*/
/*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/
/*pRsp->blockNum++;*/
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision); pTq->pVnode->config.tsdbCfg.precision);
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock); blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
@ -373,9 +356,5 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
} }
taosArrayDestroy(pBlocks); taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas); taosArrayDestroy(pSchemas);
// if (pRsp->blockNum == 0) {
// return -1;
// }
return 0; return 0;
} }

View File

@ -213,7 +213,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost // lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->lock);
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
if (numOfRegisteredPush > 0) { if (numOfRegisteredPush > 0) {
@ -231,7 +231,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosArrayDestroy(cachedKeyLens); taosArrayDestroy(cachedKeyLens);
// unlock // unlock
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
return -1; return -1;
} }
@ -320,7 +320,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosMemoryFree(data); taosMemoryFree(data);
} }
// unlock // unlock
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
} }
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {

View File

@ -282,6 +282,34 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return true; return true;
} }
int64_t tsdbCountTbDataRows(STbData *pTbData) {
SMemSkipListNode *pNode = NULL;
int64_t rowsNum = 0;
while (true) {
pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
if (pNode == pTbData->sl.pTail) {
return rowsNum;
}
rowsNum++;
}
}
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i];
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) {
continue;
}
rowsNum += tsdbCountTbDataRows(pTbData);
}
taosRUnLockLatch(&pMemTable->latch);
}
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) { static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
int32_t code = 0; int32_t code = 0;
@ -787,4 +815,4 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
_exit: _exit:
return aTbDataP; return aTbDataP;
} }

View File

@ -24,6 +24,11 @@ typedef enum {
EXTERNAL_ROWS_NEXT = 0x3, EXTERNAL_ROWS_NEXT = 0x3,
} EContentData; } EContentData;
typedef enum {
READ_MODE_COUNT_ONLY = 0x1,
READ_MODE_ALL,
} EReadMode;
typedef struct { typedef struct {
STbDataIter* iter; STbDataIter* iter;
int32_t index; int32_t index;
@ -167,6 +172,8 @@ struct STsdbReader {
uint64_t suid; uint64_t suid;
int16_t order; int16_t order;
bool freeBlock; bool freeBlock;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
int32_t capacity; int32_t capacity;
@ -2998,6 +3005,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
return code;
}
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -3006,12 +3016,19 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// build composed data block // build composed data block
code = buildComposedDataBlock(pReader); code = buildComposedDataBlock(pReader);
} else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) { } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
return code;
}
// data in memory that are earlier than current file block // data in memory that are earlier than current file block
// rows in buffer should be less than the file block in asc, greater than file block in desc // rows in buffer should be less than the file block in asc, greater than file block in desc
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts; int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else { } else {
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) { if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
return code;
}
// only return the rows in last block // only return the rows in last block
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
ASSERT(tsLast >= pBlock->maxKey.ts); ASSERT(tsLast >= pBlock->maxKey.ts);
@ -3069,6 +3086,151 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code; return code;
} }
static int32_t doSumBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
int64_t st = taosGetTimestampUs();
LRUHandle* handle = NULL;
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
goto _end;
}
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
if (num == 0) {
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return TSDB_CODE_SUCCESS;
}
SBlockIdx* pBlockIdx = NULL;
int32_t i = 0;
for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->suid) {
continue;
}
STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
if (p == NULL) {
continue;
}
STableBlockScanInfo *pScanInfo = *p;
tMapDataReset(&pScanInfo->mapData);
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
SDataBlk block = {0};
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
pReader->rowsNum += block.nRow;
}
}
_end:
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return code;
}
static int32_t readRowsCountFromFile(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
while (1) {
bool hasNext = false;
int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
if (code) {
return code;
}
if (!hasNext) { // no data files on disk
break;
}
code = doSumBlockRows(pReader, pReader->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pReader->status.loadFromFile = false;
return code;
}
static int32_t readRowsCountFromStt(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
SSttBlockLoadInfo* pBlockLoadInfo = NULL;
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
pBlockLoadInfo = &pLastBlockReader->pInfo[i];
if (!pLastBlockReader->pInfo[i].sttBlockLoaded) {
pLastBlockReader->pInfo[i].sttBlockLoaded = true;
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
if (code) {
return code;
}
}
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (size >= 1) {
SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
// all identical
if (pStart->suid == pEnd->suid) {
if (pStart->suid != pReader->suid) {
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
continue;
}
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
pReader->rowsNum += p->nRow;
}
} else {
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
uint64_t s = p->suid;
if (s < pReader->suid) {
continue;
}
if (s == pReader->suid) {
pReader->rowsNum += p->nRow;
} else if (s > pReader->suid) {
break;
}
}
}
}
}
return code;
}
static int32_t readRowsCountFromMem(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t memNum = 0, imemNum = 0;
if (pReader->pReadSnap->pMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum);
}
if (pReader->pReadSnap->pIMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum);
}
pReader->rowsNum += memNum + imemNum;
return code;
}
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
@ -3212,6 +3374,12 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else { } else {
if (pReader->status.pCurrentFileset->nSttF > 0) { if (pReader->status.pCurrentFileset->nSttF > 0) {
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
pReader->pResBlock->info.rows = pReader->rowsNum;
pReader->rowsNum = 0;
return TSDB_CODE_SUCCESS;
}
// data blocks in current file are exhausted, let's try the next file now // data blocks in current file are exhausted, let's try the next file now
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
if (pBlockData->uid != 0) { if (pBlockData->uid != 0) {
@ -3226,7 +3394,17 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked // error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
pReader->pResBlock->info.rows = pReader->rowsNum;
pReader->rowsNum = 0;
return TSDB_CODE_SUCCESS;
}
if (pReader->status.loadFromFile == false) {
return code; return code;
} }
@ -3240,6 +3418,17 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (READ_MODE_COUNT_ONLY == pReader->readMode) {
if (false == pReader->status.composedDataBlock && pDumpInfo->allDumped) {
pReader->rowsNum += pReader->pResBlock->info.rows;
pReader->pResBlock->info.rows = 0;
continue;
} else if (pReader->pResBlock->info.rows == 0 && pReader->rowsNum > 0) {
pReader->pResBlock->info.rows = pReader->rowsNum;
pReader->rowsNum = 0;
return TSDB_CODE_SUCCESS;
}
}
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -3986,6 +4175,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) { if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false; pStatus->loadFromFile = false;
} else if (READ_MODE_COUNT_ONLY == pReader->readMode) {
// DO NOTHING
} else { } else {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
} }
@ -3999,7 +4190,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
// ====================================== EXPOSED APIs ====================================== // ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) { SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly) {
STimeWindow window = pCond->twindows; STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1; pCond->twindows.skey += 1;
@ -4091,6 +4282,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
pReader->suspended = true; pReader->suspended = true;
if (countOnly) {
pReader->readMode = READ_MODE_COUNT_ONLY;
}
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code; return code;
@ -4394,6 +4589,40 @@ _err:
return code; return code;
} }
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->pResBlock;
while (1) {
if (pReader->status.loadFromFile == false) {
break;
}
code = readRowsCountFromFile(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
code = readRowsCountFromStt(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
}
code = readRowsCountFromMem(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
pBlock->info.rows = pReader->rowsNum;
pBlock->info.id.uid = 0;
pBlock->info.dataLoad = 0;
pReader->rowsNum = 0;
return pBlock->info.rows > 0;
}
static bool doTsdbNextDataBlock(STsdbReader* pReader) { static bool doTsdbNextDataBlock(STsdbReader* pReader) {
// cleanup the data that belongs to the previous data block // cleanup the data that belongs to the previous data block
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
@ -4404,6 +4633,10 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
return false; return false;
} }
if (READ_MODE_COUNT_ONLY == pReader->readMode) {
return tsdbReadRowsCountOnly(pReader);
}
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
int32_t code = buildBlockFromFiles(pReader); int32_t code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -4507,6 +4740,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg); int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
taosArrayInsert(pSup->pColAgg, 0, pTsAgg); taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
size++;
while (j < numOfCols && i < size) { while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
@ -4519,10 +4753,21 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg); taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
i += 1;
size++;
} }
j += 1; j += 1;
} }
} }
while (j < numOfCols) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
i += 1;
}
j++;
}
} }
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) { int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
@ -4602,8 +4847,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
} else if (pAgg->colId < pSup->colId[j]) { } else if (pAgg->colId < pSup->colId[j]) {
i += 1; i += 1;
} else if (pSup->colId[j] < pAgg->colId) { } else if (pSup->colId[j] < pAgg->colId) {
// ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); pResBlock->pBlockAgg[pSup->slotId[j]] = NULL;
pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; *allHave = false;
j += 1; j += 1;
} }
} }
@ -4996,4 +5241,4 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
taosMemoryFreeClear(pReader->idStr); taosMemoryFreeClear(pReader->idStr);
pReader->idStr = taosStrdup(idstr); pReader->idStr = taosStrdup(idstr);
} }

View File

@ -339,6 +339,7 @@ typedef struct STableScanInfo {
int8_t scanMode; int8_t scanMode;
int8_t assignBlockUid; int8_t assignBlockUid;
bool hasGroupByTag; bool hasGroupByTag;
bool countOnly;
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
@ -559,6 +560,7 @@ typedef struct SStreamIntervalOperatorInfo {
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
bool invertible; bool invertible;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pDelWins; // SWinRes SArray* pDelWins; // SWinRes
int32_t delIndex; int32_t delIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
@ -620,6 +622,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal; bool isFinal;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated; SArray* pUpdated;
SSHashObj* pStUpdated; SSHashObj* pStUpdated;
} SStreamSessionAggOperatorInfo; } SStreamSessionAggOperatorInfo;
@ -637,6 +640,7 @@ typedef struct SStreamStateAggOperatorInfo {
void* pDelIterator; void* pDelIterator;
SArray* pChildren; // cache for children's result; SArray* pChildren; // cache for children's result;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated; SArray* pUpdated;
SSHashObj* pSeUpdated; SSHashObj* pSeUpdated;
} SStreamStateAggOperatorInfo; } SStreamStateAggOperatorInfo;

View File

@ -92,8 +92,8 @@ typedef struct SResultRowData {
typedef struct SStreamFillLinearInfo { typedef struct SStreamFillLinearInfo {
TSKEY nextEnd; TSKEY nextEnd;
SArray* pDeltaVal; // double. value for Fill(linear). SArray* pEndPoints;
SArray* pNextDeltaVal; // double. value for Fill(linear). SArray* pNextEndPoints;
int64_t winIndex; int64_t winIndex;
bool hasNext; bool hasNext;
} SStreamFillLinearInfo; } SStreamFillLinearInfo;

View File

@ -2042,7 +2042,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
void printDataBlock(SSDataBlock* pBlock, const char* flag) { void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (!pBlock || pBlock->info.rows == 0) { if (!pBlock || pBlock->info.rows == 0) {
qDebug("===stream===printDataBlock: Block is Null or Empty"); qDebug("===stream===%s: Block is Null or Empty", flag);
return; return;
} }
char* pBuf = NULL; char* pBuf = NULL;

View File

@ -888,7 +888,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
@ -904,6 +905,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
@ -917,6 +920,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} }
// iterate operator tree // iterate operator tree
@ -944,35 +949,23 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} }
@ -1147,7 +1140,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num, if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 || pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL, false) < 0 ||
pTableScanInfo->base.dataReader == NULL) { pTableScanInfo->base.dataReader == NULL) {
qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid); qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid);
return -1; return -1;
@ -1199,7 +1192,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, false);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);

View File

@ -447,9 +447,14 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
return NULL; return NULL;
} }
void destroySPoint(void* ptr) {
SPoint* point = (SPoint*) ptr;
taosMemoryFreeClear(point->val);
}
void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) { void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
taosArrayDestroy(pFillLinear->pDeltaVal); taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
taosArrayDestroy(pFillLinear->pNextDeltaVal); taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
taosMemoryFree(pFillLinear); taosMemoryFree(pFillLinear);
return NULL; return NULL;
} }
@ -611,19 +616,15 @@ static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pR
} }
} }
static void calcRowDeltaData(SResultRowData* pStartRow, SResultRowData* pEndRow, SArray* pDelta, SFillColInfo* pFillCol, static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol,
int32_t numOfCol, int32_t winCount) { int32_t numOfCol) {
for (int32_t i = 0; i < numOfCol; i++) { for (int32_t i = 0; i < numOfCol; i++) {
if (!pFillCol[i].notFillCol) { if (!pFillCol[i].notFillCol) {
int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i); int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i);
SResultCellData* pSCell = getResultCell(pStartRow, slotId);
double start = 0.0;
GET_TYPED_DATA(start, double, pSCell->type, pSCell->pData);
SResultCellData* pECell = getResultCell(pEndRow, slotId); SResultCellData* pECell = getResultCell(pEndRow, slotId);
double end = 0.0; SPoint* pPoint = taosArrayGet(pEndPoins, slotId);
GET_TYPED_DATA(end, double, pECell->type, pECell->pData); pPoint->key = pEndRow->key;
double delta = (end - start) / winCount; memcpy(pPoint->val, pECell->pData, pECell->bytes);
taosArraySet(pDelta, slotId, &delta);
} }
} }
} }
@ -674,10 +675,8 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo); setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
int32_t numOfWins = taosTimeCountInterval(pFillSup->prev.key, pFillSup->next.key, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->prev, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;
} break; } break;
@ -780,25 +779,19 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_MID; pFillInfo->pos = FILL_POS_MID;
pFillInfo->pLinearInfo->nextEnd = nextWKey; pFillInfo->pLinearInfo->nextEnd = nextWKey;
int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pNextDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pLinearInfo->hasNext = true; pFillInfo->pLinearInfo->hasNext = true;
} else if (hasPrevWindow(pFillSup)) { } else if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} else { } else {
@ -806,10 +799,8 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
int32_t numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pResRow = &pFillSup->cur;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} }
@ -906,13 +897,18 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
colDataSetNULL(pColData, index); colDataSetNULL(pColData, index);
continue; continue;
} }
double* pDelta = taosArrayGet(pFillInfo->pLinearInfo->pDeltaVal, slotId); SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
double vCell = 0; double vCell = 0;
GET_TYPED_DATA(vCell, double, pCell->type, pCell->pData); SPoint start = {0};
vCell += (*pDelta) * pFillInfo->pLinearInfo->winIndex; start.key = pFillInfo->pResRow->key;
int64_t result = 0; start.val = pCell->pData;
SET_TYPED_DATA(&result, pCell->type, vCell);
colDataSetVal(pColData, index, (const char*)&result, false); SPoint cur = {0};
cur.key = pFillInfo->current;
cur.val = taosMemoryCalloc(1, pCell->bytes);
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
colDataSetVal(pColData, index, (const char*)cur.val, false);
destroySPoint(&cur);
} }
} }
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
@ -953,8 +949,7 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) { if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;
taosArrayClear(pFillInfo->pLinearInfo->pDeltaVal); taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
taosArrayAddAll(pFillInfo->pLinearInfo->pDeltaVal, pFillInfo->pLinearInfo->pNextDeltaVal);
pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pResRow = &pFillSup->cur;
setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo); setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
doStreamFillLinear(pFillSup, pFillInfo, pRes); doStreamFillLinear(pFillSup, pFillInfo, pRes);
@ -1359,15 +1354,19 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo)); pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
pFillInfo->pLinearInfo->pDeltaVal = NULL; pFillInfo->pLinearInfo->pEndPoints = NULL;
pFillInfo->pLinearInfo->pNextDeltaVal = NULL; pFillInfo->pLinearInfo->pNextEndPoints = NULL;
if (pFillSup->type == TSDB_FILL_LINEAR) { if (pFillSup->type == TSDB_FILL_LINEAR) {
pFillInfo->pLinearInfo->pDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double)); pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
pFillInfo->pLinearInfo->pNextDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double)); pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) { for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
double value = 0.0; SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
taosArrayPush(pFillInfo->pLinearInfo->pDeltaVal, &value); SPoint value = {0};
taosArrayPush(pFillInfo->pLinearInfo->pNextDeltaVal, &value); value.val = taosMemoryCalloc(1, pColData->info.bytes);
taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
value.val = taosMemoryCalloc(1, pColData->info.bytes);
taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
} }
} }
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;

View File

@ -31,6 +31,9 @@
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
int32_t scanDebug = 0;
#define MULTI_READER_MAX_TABLE_NUM 5000 #define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
@ -654,9 +657,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue; continue;
} }
ASSERT(pBlock->info.id.uid != 0); if (pBlock->info.id.uid) {
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
}
uint32_t status = 0; uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -680,7 +684,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.id.uid != 0);
return pBlock; return pBlock;
} }
return NULL; return NULL;
@ -785,7 +788,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
ASSERT(pInfo->base.dataReader == NULL); ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
@ -797,7 +800,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SSDataBlock* result = doGroupedTableScan(pOperator); SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) { if (result != NULL) {
ASSERT(result->info.id.uid != 0);
return result; return result;
} }
@ -917,6 +919,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error; goto _error;
} }
if (scanDebug) {
pInfo->countOnly = true;
}
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
optrDefaultBufFn, getTableScannerExecInfo); optrDefaultBufFn, getTableScannerExecInfo);
@ -1008,7 +1014,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
@ -1803,6 +1809,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printDataBlock(pInfo->pUpdateRes, "recover update");
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} break; } break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: { case STREAM_SCAN_FROM_DATAREADER_RANGE: {
@ -1813,7 +1820,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "stream scan update"); printDataBlock(pSDB, "scan recover update");
calBlockTbName(pInfo, pSDB); calBlockTbName(pInfo, pSDB);
return pSDB; return pSDB;
} }
@ -1838,6 +1845,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows); qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
@ -2600,7 +2608,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SReadHandle* pHandle = &pInfo->base.readHandle; SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader || !source->multiReader) { if (NULL == source->dataReader || !source->multiReader) {
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo)); code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
if (code != 0) { if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }

View File

@ -2267,7 +2267,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t num = tableListGetSize(pTableListInfo); size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0); void* pList = tableListGetInfo(pTableListInfo, 0);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str); code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false);
cleanupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
if (code != 0) { if (code != 0) {
goto _error; goto _error;

View File

@ -2110,10 +2110,12 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
} else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { } else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); qError("%s apply combine functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
} }
} else if (pDestCtx[k].fpSet.combine == NULL) {
char* funName = fmGetFuncName(pDestCtx[k].functionId);
qError("%s error, combine funcion for %s is not implemented", GET_TASKID(pTaskInfo), funName);
taosMemoryFreeClear(funName);
} }
} }
} }
@ -2769,6 +2771,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
@ -3587,6 +3590,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->isFinal = false; pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode; pInfo->pPhyNode = pPhyNode;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired; pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pStUpdated = NULL; pInfo->pStUpdated = NULL;
@ -4112,6 +4116,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pStateNode->window.igExpired; pInfo->ignoreExpiredData = pStateNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pSeUpdated = NULL; pInfo->pSeUpdated = NULL;
@ -4885,6 +4890,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->interval = interval; pInfo->interval = interval;
pInfo->twAggSup = twAggSupp; pInfo->twAggSup = twAggSupp;
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->isFinal = false; pInfo->isFinal = false;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;

View File

@ -235,6 +235,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t groupKeyFunction(SqlFunctionCtx* pCtx); int32_t groupKeyFunction(SqlFunctionCtx* pCtx);
int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -2480,7 +2480,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "irate", .name = "irate",
.type = FUNCTION_TYPE_IRATE, .type = FUNCTION_TYPE_IRATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateIrate, .translateFunc = translateIrate,
.getEnvFunc = getIrateFuncEnv, .getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup, .initFunc = irateFuncSetup,
@ -3234,6 +3234,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = groupKeyFunction, .processFunc = groupKeyFunction,
.finalizeFunc = groupKeyFinalize, .finalizeFunc = groupKeyFinalize,
.combineFunc = groupKeyCombine,
.pPartialFunc = "_group_key", .pPartialFunc = "_group_key",
.pMergeFunc = "_group_key" .pMergeFunc = "_group_key"
}, },

View File

@ -5900,6 +5900,39 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SGroupKeyInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SGroupKeyInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
// escape rest of data blocks to avoid first entry to be overwritten.
if (pDBuf->hasResult) {
goto _group_key_over;
}
if (pSBuf->isNull) {
pDBuf->isNull = true;
pDBuf->hasResult = true;
goto _group_key_over;
}
if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) {
memcpy(pDBuf->data, pSBuf->data,
(pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data) : varDataTLen(pSBuf->data));
} else {
memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes);
}
pDBuf->hasResult = true;
_group_key_over:
SET_VAL(pDResInfo, 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;

View File

@ -447,3 +447,10 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc
return code; return code;
} }
char* fmGetFuncName(int32_t funcId) {
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return taosStrdup("invalid function");
}
return taosStrdup(funcMgtBuiltins[funcId].name);
}

View File

@ -566,53 +566,19 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
return code; return code;
} }
static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* fields, int numFields) { static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
if (NULL == pUseCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBoundInfo->numOfBound = 0;
int16_t lastColIdx = -1; // last column found
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < numFields; i++) { for (int i = 0; i < numFields; i++) {
SToken token; if(strcmp(pSchema->name, fields[i].name) == 0){
token.z = fields[i].name; return true;
token.n = strlen(fields[i].name);
int16_t t = lastColIdx + 1;
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
if (index < 0 && t > 0) {
index = insFindCol(&token, 0, t, pSchema);
}
if (index < 0) {
uError("can not find column name:%s", token.z);
code = TSDB_CODE_PAR_INVALID_COLUMN;
break;
} else if (pUseCols[index]) {
code = TSDB_CODE_PAR_INVALID_COLUMN;
uError("duplicated column name:%s", token.z);
break;
} else {
lastColIdx = index;
pUseCols[index] = true;
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
++pBoundInfo->numOfBound;
} }
} }
if (TSDB_CODE_SUCCESS == code && !pUseCols[0]) { return false;
uError("primary timestamp column can not be null:");
code = TSDB_CODE_PAR_INVALID_COLUMN;
}
taosMemoryFree(pUseCols);
return code;
} }
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields,
int numFields, bool needChangeLength) { int numFields, bool needChangeLength) {
void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true); sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true);
@ -620,19 +586,14 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
uError("insGetTableDataCxt error"); uError("insGetTableDataCxt error");
goto end; goto end;
} }
if (tFields != NULL) {
ret = bindFileds(&pTableCxt->boundColsInfo, getTableColumnSchema(pTableMeta), tFields, numFields); if(tmp == NULL){
ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("bindFileds error"); uError("initTableColSubmitData error");
goto end; goto end;
} }
} }
// no need to bind, because select * get all fields
ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) {
uError("initTableColSubmitData error");
goto end;
}
char* p = (char*)data; char* p = (char*)data;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
@ -660,35 +621,43 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta); SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta);
SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo; SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
if (boundInfo->numOfBound != numOfCols) { if (numFields != numOfCols) {
uError("boundInfo->numOfBound:%d != numOfCols:%d", boundInfo->numOfBound, numOfCols); uError("numFields:%d != numOfCols:%d", numFields, numOfCols);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (numFields > boundInfo->numOfBound) {
uError("numFields:%d > boundInfo->numOfBound:%d", numFields, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA; ret = TSDB_CODE_INVALID_PARA;
goto end; goto end;
} }
for (int c = 0; c < boundInfo->numOfBound; ++c) { for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; SSchema* pColSchema = &pSchema[c];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c); SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
if(findFileds(pColSchema, tFields, numFields)){
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { int8_t* offset = pStart;
uError("type or bytes not equal"); if (IS_VAR_DATA_TYPE(pColSchema->type)) {
ret = TSDB_CODE_INVALID_PARA; pStart += numOfRows * sizeof(int32_t);
goto end; } else {
} pStart += BitmapLen(numOfRows);
}
char* pData = pStart;
int8_t* offset = pStart; tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if (IS_VAR_DATA_TYPE(pColSchema->type)) { fields += sizeof(int8_t) + sizeof(int32_t);
pStart += numOfRows * sizeof(int32_t); if (needChangeLength) {
} else { pStart += htonl(colLength[c]);
pStart += BitmapLen(numOfRows); } else {
} pStart += colLength[c];
char* pData = pStart; }
}else{
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, NULL, NULL);
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength) {
pStart += htonl(colLength[c]);
} else {
pStart += colLength[c];
} }
} }

View File

@ -444,7 +444,7 @@ static int32_t getInsTagsTableTargetNameFromOp(int32_t acctId, SOperatorNode* pO
} else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) { } else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) {
pVal = (SValueNode*)pOper->pRight; pVal = (SValueNode*)pOper->pRight;
} }
if (NULL == pCol || NULL == pVal) { if (NULL == pCol || NULL == pVal || NULL == pVal->literal || 0 == strcmp(pVal->literal, "")) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (batchCnt >= batchSz) break; if (batchCnt >= batchSz) break;
} }
if (taosArrayGetSize(pRes) == 0) { if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes); if (finished) {
break; taosArrayDestroy(pRes);
qDebug("task %d finish recover exec task ", pTask->taskId);
break;
} else {
qDebug("task %d continue recover exec task ", pTask->taskId);
continue;
}
} }
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) { if (qRes == NULL) {

View File

@ -871,6 +871,7 @@
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim ,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
,,y,script,./test.sh -f tsim/query/partitionby.sim ,,y,script,./test.sh -f tsim/query/partitionby.sim
,,y,script,./test.sh -f tsim/query/tableCount.sim ,,y,script,./test.sh -f tsim/query/tableCount.sim
,,y,script,./test.sh -f tsim/query/nullColSma.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim

View File

@ -167,8 +167,10 @@ function run_thread() {
local case_build_san=`echo "$line"|cut -d, -f3` local case_build_san=`echo "$line"|cut -d, -f3`
if [ "${case_build_san}" == "y" ]; then if [ "${case_build_san}" == "y" ]; then
case_build_san="y" case_build_san="y"
DEBUGPATH="debugSan"
elif [[ "${case_build_san}" == "n" ]] || [[ "${case_build_san}" == "" ]]; then elif [[ "${case_build_san}" == "n" ]] || [[ "${case_build_san}" == "" ]]; then
case_build_san="n" case_build_san="n"
DEBUGPATH="debugNoSan"
else else
usage usage
exit 1 exit 1
@ -301,10 +303,10 @@ function run_thread() {
if [ ! -z "$corefile" ]; then if [ ! -z "$corefile" ]; then
echo -e "\e[34m corefiles: $corefile \e[0m" echo -e "\e[34m corefiles: $corefile \e[0m"
local build_dir=$log_dir/build_${hosts[index]} local build_dir=$log_dir/build_${hosts[index]}
local remote_build_dir="${workdirs[index]}/TDengine/debug/build" local remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build"
if [ $ent -ne 0 ]; then # if [ $ent -ne 0 ]; then
remote_build_dir="${workdirs[index]}/TDinternal/debug/build" # remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build"
fi # fi
mkdir $build_dir 2>/dev/null mkdir $build_dir 2>/dev/null
if [ $? -eq 0 ]; then if [ $? -eq 0 ]; then
# scp build binary # scp build binary

View File

@ -0,0 +1,139 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
$dbPrefix = m_in_db
$tbPrefix = m_in_tb
$mtPrefix = m_in_mt
$tbNum = 1
$rowNum = 200
$totalNum = 400
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database if exists $db
sql create database $db vgroups 1 maxrows 200 minrows 10;
sql use $db
sql create table $mt (ts timestamp, f1 int, f2 float) TAGS(tgcol int)
print ====== start create child tables and insert data
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , NULL , $x )
$x = $x + 1
endw
$i = $i + 1
endw
$i = 1
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x , NULL )
$x = $x + 1
endw
sql flush database $db
print =============== step2
$i = 0
$tb = $tbPrefix . $i
sql select max(f1) from $tb
if $rows != 1 then
return -1
endi
if $data00 != NULL then
return -1
endi
$i = 1
$tb = $tbPrefix . $i
sql select max(f2) from $tb
if $rows != 1 then
return -1
endi
if $data00 != NULL then
return -1
endi
$rowNum = 10
print ====== insert more data
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481700000 + $cc
sql insert into $tb values ($ms , $x , $x )
$x = $x + 1
endw
$i = $i + 1
endw
$i = 1
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481700000 + $cc
sql insert into $tb values ($ms , $x , $x )
$x = $x + 1
endw
sql flush database $db
print =============== step3
$i = 0
$tb = $tbPrefix . $i
sql select max(f1) from $tb
if $rows != 1 then
return -1
endi
if $data00 != 9 then
return -1
endi
$i = 1
$tb = $tbPrefix . $i
sql select max(f2) from $tb
if $rows != 1 then
return -1
endi
if $data00 != 9.00000 then
print $data00
return -1
endi
print =============== clear
#sql drop database $db
#sql select * from information_schema.ins_databases
#if $rows != 0 then
# return -1
#endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -51,6 +51,11 @@ if $data00 != @ins_stables@ then
return -1 return -1
endi endi
sql select * from information_schema.ins_tables where table_name='';
if $rows != 0 then
return -1
endi
sql select tbname from information_schema.ins_tables; sql select tbname from information_schema.ins_tables;
print $rows $data00 print $rows $data00
if $rows != 33 then if $rows != 33 then

View File

@ -272,4 +272,122 @@ if $data12 != 2 then
goto loop3 goto loop3
endi endi
print ===== step3
sql drop database if exists test4;
sql create database test4 vgroups 10;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int);
sql create table aaa using st tags(1,1,1);
sql create table bbb using st tags(2,2,2);
sql create table ccc using st tags(3,2,2);
sql create table ddd using st tags(4,2,2);
sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1 from st partition by c interval(1s) ;
sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa");
sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa");
sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa");
sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa");
sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa");
sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa");
sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa");
sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa");
sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa");
sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa");
sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa");
sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa");
sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa");
sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa");
sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa");
sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa");
sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa");
sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa");
sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa");
sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa");
sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa");
sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa");
sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa");
sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa");
$loop_count = 0
loop4:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop4
endi
sql delete from aaa where ts = 1648791223003 ;
$loop_count = 0
loop5:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop5
endi
sql delete from ccc;
$loop_count = 0
loop6:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop6
endi
sql delete from ddd;
$loop_count = 0
loop7:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop7
endi
print ===== over
system sh/stop_dnodes.sh system sh/stop_dnodes.sh

View File

@ -100,7 +100,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)): if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....") tdLog.info("wait subscriptions exit ....")
@ -131,7 +131,7 @@ class TDTestCase:
'batchNum': 100, 'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0, 'endTs': 0,
'pollDelay': 10, 'pollDelay': 20,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
@ -193,7 +193,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)): if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....") tdLog.info("wait subscriptions exit ....")

View File

@ -541,7 +541,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*" PRIu64, width, *((uint64_t *)val)); printf("%*" PRIu64, width, *((uint64_t *)val));
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
printf("%*.5f", width, GET_FLOAT_VAL(val)); printf("%*ef", width, GET_FLOAT_VAL(val));
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val));