Merge pull request #11163 from taosdata/feature/tq

fix
This commit is contained in:
Liu Jicong 2022-03-31 19:04:47 +08:00 committed by GitHub
commit b06db715bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 386 additions and 261 deletions

View File

@ -5,6 +5,7 @@ AccessModifierOffset: -1
AlignAfterOpenBracket: Align AlignAfterOpenBracket: Align
AlignConsecutiveAssignments: false AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: true AlignConsecutiveDeclarations: true
AlignConsecutiveMacros: true
AlignEscapedNewlinesLeft: true AlignEscapedNewlinesLeft: true
AlignOperands: true AlignOperands: true
AlignTrailingComments: true AlignTrailingComments: true

View File

@ -28,7 +28,7 @@ int32_t init_env() {
return -1; return -1;
} }
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -42,25 +42,33 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); pRes =
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)"); pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); pRes = taos_query(pConn, "create table if not exists ct1 using st1 tags(2000)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
pRes = taos_query(pConn, "create table if not exists ct3 using st1 tags(3000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes); taos_free_result(pRes);
return 0; return 0;
} }
@ -82,12 +90,40 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/ /*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
pRes = taos_query(pConn, "create topic test_stb_topic_1 as select * from tu1"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1 from ct1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
#if 0
pRes = taos_query(pConn, "insert into tu1 values(now, 1, 1.0, 'bi1')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu1 values(now+1d, 1, 1.0, 'bi1')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu2 values(now, 2, 2.0, 'bi2')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu2 values(now+1d, 2, 2.0, 'bi2')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
#endif
taos_close(pConn); taos_close(pConn);
return 0; return 0;
} }
@ -115,7 +151,7 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() { tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1"); tmq_list_append(topic_list, "topic_ctb_column");
return topic_list; return topic_list;
} }
@ -215,8 +251,8 @@ int main(int argc, char* argv[]) {
if (argc > 1) { if (argc > 1) {
printf("env init\n"); printf("env init\n");
code = init_env(); code = init_env();
}
create_topic(); create_topic();
}
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/ /*perf_loop(tmq, topic_list);*/

View File

@ -20,7 +20,7 @@
#include "taos.h" #include "taos.h"
int32_t init_env() { int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 7010); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
return -1; return -1;
} }
@ -65,7 +65,7 @@ int32_t init_env() {
int32_t create_stream() { int32_t create_stream() {
printf("create stream\n"); printf("create stream\n");
TAOS_RES* pRes; TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 7010); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
return -1; return -1;
} }

View File

@ -37,6 +37,14 @@ enum {
TMQ_MSG_TYPE__EP_RSP, TMQ_MSG_TYPE__EP_RSP,
}; };
enum {
STREAM_TRIGGER__AT_ONCE = 1,
STREAM_TRIGGER__WINDOW_CLOSE,
STREAM_TRIGGER__BY_COUNT,
STREAM_TRIGGER__BY_BATCH_COUNT,
STREAM_TRIGGER__BY_EVENT_TIME,
};
typedef struct { typedef struct {
uint32_t numOfTables; uint32_t numOfTables;
SArray* pGroupList; SArray* pGroupList;
@ -59,7 +67,10 @@ typedef struct SDataBlockInfo {
int32_t rowSize; int32_t rowSize;
int16_t numOfCols; int16_t numOfCols;
int16_t hasVarCol; int16_t hasVarCol;
union {int64_t uid; int64_t blockId;}; union {
int64_t uid;
int64_t blockId;
};
int64_t groupId; // no need to serialize int64_t groupId; // no need to serialize
} SDataBlockInfo; } SDataBlockInfo;
@ -93,7 +104,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
int32_t tEncodeDataBlocks(void** buf, const SArray* blocks); int32_t tEncodeDataBlocks(void** buf, const SArray* blocks);
void* tDecodeDataBlocks(const void* buf, SArray** blocks); void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData) ; void colDataDestroy(SColumnInfoData* pColData);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols, // WARNING: do not use info.numOfCols,

View File

@ -98,7 +98,7 @@ typedef void *SRow;
typedef struct { typedef struct {
TDRowValT valType; TDRowValT valType;
void * val; void *val;
} SCellVal; } SCellVal;
typedef struct { typedef struct {
@ -158,8 +158,8 @@ typedef struct {
int16_t nBitmaps; int16_t nBitmaps;
int16_t nBoundBitmaps; int16_t nBoundBitmaps;
int32_t offset; int32_t offset;
void * pBitmap; void *pBitmap;
void * pOffset; void *pOffset;
int32_t extendedRowSize; int32_t extendedRowSize;
} SRowBuilder; } SRowBuilder;
@ -275,7 +275,7 @@ static FORCE_INLINE int32_t tdSetBitmapValType(void *pBitmap, int16_t colIdx, TD
} }
int16_t nBytes = colIdx / TD_VTYPE_PARTS; int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR; int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes); char *pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) { switch (nOffset) {
case 0: case 0:
*pDestByte = ((*pDestByte) & 0x3F) | (valType << 6); *pDestByte = ((*pDestByte) & 0x3F) | (valType << 6);
@ -313,7 +313,7 @@ static FORCE_INLINE int32_t tdGetBitmapValType(void *pBitmap, int16_t colIdx, TD
} }
int16_t nBytes = colIdx / TD_VTYPE_PARTS; int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR; int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes); char *pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) { switch (nOffset) {
case 0: case 0:
*pValType = (((*pDestByte) & 0xC0) >> 6); *pValType = (((*pDestByte) & 0xC0) >> 6);
@ -620,7 +620,7 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowVa
if (tdValIsNorm(valType, val, colType)) { if (tdValIsNorm(valType, val, colType)) {
// ts key stored in STSRow.ts // ts key stored in STSRow.ts
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset); SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset);
char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row)); char *ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
pColIdx->colId = colId; pColIdx->colId = colId;
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
@ -638,7 +638,7 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowVa
// NULL/None value // NULL/None value
else { else {
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset); SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset);
char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row)); char *ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
pColIdx->colId = colId; pColIdx->colId = colId;
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
const void *nullVal = getNullValue(colType); const void *nullVal = getNullValue(colType);
@ -775,8 +775,8 @@ static FORCE_INLINE int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, v
typedef struct { typedef struct {
STSchema *pSchema; STSchema *pSchema;
STSRow * pRow; STSRow *pRow;
void * pBitmap; void *pBitmap;
uint32_t offset; uint32_t offset;
col_id_t maxColId; col_id_t maxColId;
col_id_t colIdx; // [PRIMARYKEY_TIMESTAMP_COL_ID, nSchemaCols], PRIMARYKEY_TIMESTAMP_COL_ID equals 1 col_id_t colIdx; // [PRIMARYKEY_TIMESTAMP_COL_ID, nSchemaCols], PRIMARYKEY_TIMESTAMP_COL_ID equals 1
@ -881,7 +881,7 @@ static FORCE_INLINE bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colTy
// internal // internal
static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx,
SCellVal *pVal) { SCellVal *pVal) {
STSRow * pRow = pIter->pRow; STSRow *pRow = pIter->pRow;
SKvRowIdx *pKvIdx = NULL; SKvRowIdx *pKvIdx = NULL;
bool colFound = false; bool colFound = false;
col_id_t kvNCols = tdRowGetNCols(pRow); col_id_t kvNCols = tdRowGetNCols(pRow);
@ -1076,7 +1076,7 @@ typedef struct {
typedef struct { typedef struct {
STSchema *pSchema; STSchema *pSchema;
STSRow * pRow; STSRow *pRow;
} STSRowReader; } STSRowReader;
typedef struct { typedef struct {

View File

@ -157,6 +157,7 @@ typedef struct SWalReadHandle {
int64_t curVersion; int64_t curVersion;
int64_t capacity; int64_t capacity;
int64_t status; // if cursor valid int64_t status; // if cursor valid
TdThreadMutex mutex;
SWalHead *pHead; SWalHead *pHead;
} SWalReadHandle; } SWalReadHandle;
#pragma pack(pop) #pragma pack(pop)
@ -191,6 +192,7 @@ int32_t walEndSnapshot(SWal *);
SWalReadHandle *walOpenReadHandle(SWal *); SWalReadHandle *walOpenReadHandle(SWal *);
void walCloseReadHandle(SWalReadHandle *); void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead);
// deprecated // deprecated
#if 0 #if 0

View File

@ -255,6 +255,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
break; break;
} }
msg = NULL;
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) { while (1) {
taosGetQitem(tmq->qall, (void**)&msg); taosGetQitem(tmq->qall, (void**)&msg);
@ -787,7 +788,7 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
static bool noPrintSchema; static bool noPrintSchema;
char pBuf[128]; char pBuf[128];
SMqPollRsp* pRsp = &tmq_message->msg; SMqPollRsp* pRsp = &tmq_message->msg;
int32_t colNum = pRsp->schema->nCols; int32_t colNum = 2;
if (!noPrintSchema) { if (!noPrintSchema) {
printf("|"); printf("|");
for (int32_t i = 0; i < colNum; i++) { for (int32_t i = 0; i < colNum; i++) {
@ -838,6 +839,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
int32_t tmqEpoch = atomic_load_32(&tmq->epoch); int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) { if (msgEpoch < tmqEpoch) {
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
tscWarn("discard rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch); tscWarn("discard rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch);
return 0; return 0;
@ -886,6 +888,9 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto WRITE_QUEUE_FAIL; goto WRITE_QUEUE_FAIL;
} }
tscError("tmq recv poll: vg %d, req offset %ld, rsp offset %ld", pParam->pVg->vgId, pRsp->msg.reqOffset,
pRsp->msg.rspOffset);
pRsp->vg = pParam->pVg; pRsp->vg = pParam->pVg;
taosWriteQitem(tmq->mqueue, pRsp); taosWriteQitem(tmq->mqueue, pRsp);
atomic_add_fetch_32(&tmq->readyRequest, 1); atomic_add_fetch_32(&tmq->readyRequest, 1);
@ -902,6 +907,7 @@ WRITE_QUEUE_FAIL:
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/ /*printf("call update ep %d\n", epoch);*/
/*printf("tmq update ep epoch %d to epoch %d\n", tmq->epoch, epoch);*/
bool set = false; bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
@ -932,6 +938,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
for (int32_t k = 0; k < vgNumCur; k++) { for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId); sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
/*printf("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey);*/
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
} }
break; break;
@ -945,9 +952,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset; int64_t offset = pVgEp->offset;
/*printf("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset);*/
if (pOffset != NULL) { if (pOffset != NULL) {
offset = *pOffset; offset = *pOffset;
/*printf("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey);*/
} }
/*printf("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset);*/
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0, .pollCnt = 0,
.currentOffset = offset, .currentOffset = offset,
@ -1195,6 +1205,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus != TMQ_VG_STATUS__IDLE) {
/*printf("skip vg %d\n", pVg->vgId);*/
continue; continue;
} }
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
@ -1238,6 +1249,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1); atomic_add_fetch_32(&tmq->waitingRequest, 1);
/*tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset);*/
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
tmq->pollCnt++; tmq->pollCnt++;

View File

@ -735,6 +735,9 @@ typedef struct {
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
int32_t fixedSinkVgId; // 0 for shuffle int32_t fixedSinkVgId; // 0 for shuffle
int64_t smaId; // 0 for unused int64_t smaId; // 0 for unused
int8_t trigger;
int32_t triggerParam;
int64_t waterMark;
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;

View File

@ -194,7 +194,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);

View File

@ -27,5 +27,5 @@ void metaCloseUidGnrt(SMeta *pMeta) { /* TODO */
tb_uid_t metaGenerateUid(SMeta *pMeta) { tb_uid_t metaGenerateUid(SMeta *pMeta) {
// Generate a new table UID // Generate a new table UID
return ++(pMeta->uidGnrt.nextUid); return tGenIdPI32();
} }

View File

@ -250,7 +250,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
return 0; return 0;
} }
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t fetchOffset; int64_t fetchOffset;
@ -264,6 +264,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
fetchOffset = pReq->currentOffset + 1; fetchOffset = pReq->currentOffset + 1;
} }
/*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/
SMqPollRsp rsp = { SMqPollRsp rsp = {
/*.consumerId = consumerId,*/ /*.consumerId = consumerId,*/
.numOfTopics = 0, .numOfTopics = 0,
@ -288,37 +290,47 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
rsp.reqOffset = pReq->currentOffset; rsp.reqOffset = pReq->currentOffset;
rsp.skipLogNum = 0; rsp.skipLogNum = 0;
SWalHead* pHead;
while (1) { while (1) {
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { SWalReadHead* pHead;
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time // TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and // if data inserted during waiting, launch query and
// response to user // response to user
break; break;
} }
int8_t pos = fetchOffset % TQ_BUFFER_SIZE; /*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType,
pHead = pTopic->pReadhandle->pHead; * pReq->epoch);*/
if (pHead->head.msgType == TDMT_VND_SUBMIT) { /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; /*pHead = pTopic->pReadhandle->pHead;*/
qTaskInfo_t task = pTopic->buffer.output[pos].task; if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
/*printf("from topic %s from consumer\n", pTopic->topicName, consumerId);*/
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) { while (1) {
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock = NULL;
uint64_t ts; uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) { if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false); ASSERT(false);
} }
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
fetchOffset++; /*pos = fetchOffset % TQ_BUFFER_SIZE;*/
pos = fetchOffset % TQ_BUFFER_SIZE;
rsp.skipLogNum++;
break; break;
} }
taosArrayPush(pRes, pDataBlock); taosArrayPush(pRes, pDataBlock);
rsp.schema = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; }
if (taosArrayGetSize(pRes) == 0) {
fetchOffset++;
rsp.skipLogNum++;
taosArrayDestroy(pRes);
continue;
}
rsp.schema = pTopic->buffer.output[workerId].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset; rsp.rspOffset = fetchOffset;
rsp.numOfTopics = 1; rsp.numOfTopics = 1;
@ -328,6 +340,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
pMsg->code = -1; pMsg->code = -1;
taosMemoryFree(pHead);
return -1; return -1;
} }
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
@ -340,10 +353,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
/*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset,
* pHead->msgType,*/
/*pReq->epoch);*/
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
return 0; return 0;
}
} else { } else {
taosMemoryFree(pHead);
fetchOffset++; fetchOffset++;
rsp.skipLogNum++; rsp.skipLogNum++;
} }
@ -368,6 +385,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
/*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/
/*}*/ /*}*/
return 0; return 0;
@ -432,7 +450,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
}; };
pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
ASSERT(pTopic->buffer.output[i].task);
} }
printf("set topic %s to consumer %ld\n", pTopic->topicName, req.consumerId);
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.consumerId); tqHandleCommit(pTq->tqMeta, req.consumerId);

View File

@ -168,6 +168,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
break; break;
} }
if (colDataAppend(pColData, curRow, sVal.val, false) < 0) { if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {
/*if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {*/
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL; return NULL;
} }

View File

@ -1394,7 +1394,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64 tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, REPO_ID(pRepo), TABLE_UID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
return 0; return 0;

View File

@ -66,7 +66,7 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC: case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);

View File

@ -165,6 +165,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// } // }
break; break;
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
/*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/
if (pVnode->config.streamMode == 0) { if (pVnode->config.streamMode == 0) {
if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) { if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) {
// TODO: handle error // TODO: handle error

View File

@ -719,6 +719,11 @@ static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExc
if (NULL == pScan->pScanCols) { if (NULL == pScan->pScanCols) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
if (TSDB_CODE_SUCCESS == code) {
code = sortScanCols(pScan->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = sortScanCols(pScan->pScanCols); code = sortScanCols(pScan->pScanCols);
} }

View File

@ -86,7 +86,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
} }
} }
void schFreeTask(SSchTask* pTask) { void schFreeTask(SSchTask *pTask) {
if (pTask->candidateAddrs) { if (pTask->candidateAddrs) {
taosArrayDestroy(pTask->candidateAddrs); taosArrayDestroy(pTask->candidateAddrs);
} }
@ -126,11 +126,13 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp
if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) { if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
} }
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
} }
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
@ -138,12 +140,14 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
case TDMT_VND_RES_READY_RSP: case TDMT_VND_RES_READY_RSP:
reqMsgType = TDMT_VND_QUERY; reqMsgType = TDMT_VND_QUERY;
if (lastMsgType != reqMsgType && -1 != lastMsgType) { if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType)); SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s",
(lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
@ -151,12 +155,14 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
case TDMT_VND_FETCH_RSP: case TDMT_VND_FETCH_RSP:
if (lastMsgType != reqMsgType && -1 != lastMsgType) { if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
@ -171,12 +177,14 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
} }
if (lastMsgType != reqMsgType) { if (lastMsgType != reqMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
@ -654,7 +662,8 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
if (SCH_IS_DATA_SRC_TASK(pTask)) { if (SCH_IS_DATA_SRC_TASK(pTask)) {
if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes, SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes,
SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { } else {
@ -662,7 +671,8 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
if ((pTask->candidateIdx + 1) >= candidateNum) { if ((pTask->candidateIdx + 1) >= candidateNum) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d", pTask->candidateIdx, candidateNum); SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
pTask->candidateIdx, candidateNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -706,9 +716,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
memcpy(&hb->trans, trans, sizeof(*trans)); memcpy(&hb->trans, trans, sizeof(*trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock); SCH_UNLOCK(SCH_WRITE, &hb->lock);
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId,
schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle);
trans->transHandle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -965,7 +974,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
int8_t status = 0; int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode); SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
rspCode);
SCH_RET(atomic_load_32(&pJob->errCode)); SCH_RET(atomic_load_32(&pJob->errCode));
} }
@ -1023,7 +1033,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
//SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); // SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
break; break;
} }
@ -1173,7 +1183,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SCH_ERR_RET(schUpdateHbConnection(&rsp.epId, &trans)); SCH_ERR_RET(schUpdateHbConnection(&rsp.epId, &trans));
int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus); int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn, rsp.epId.ep.port); qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
rsp.epId.ep.port);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i); STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
@ -1188,7 +1199,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
// TODO // TODO
SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId, jobTaskStatusStr(taskStatus->status)); SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
jobTaskStatusStr(taskStatus->status));
schReleaseJob(taskStatus->refId); schReleaseJob(taskStatus->refId);
} }
@ -1219,7 +1231,6 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) { switch (msgType) {
case TDMT_VND_CREATE_TABLE: case TDMT_VND_CREATE_TABLE:
@ -1259,7 +1270,7 @@ void schFreeRpcCtxVal(const void *arg) {
return; return;
} }
SMsgSendInfo* pMsgSendInfo = (SMsgSendInfo *)arg; SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
taosMemoryFreeClear(pMsgSendInfo->param); taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo); taosMemoryFreeClear(pMsgSendInfo);
} }
@ -1301,10 +1312,9 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) { int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
int32_t code = 0; int32_t code = 0;
SMsgSendInfo* pMsgSendInfo = NULL; SMsgSendInfo *pMsgSendInfo = NULL;
pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) { if (NULL == pMsgSendInfo) {
@ -1342,7 +1352,7 @@ _return:
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0; int32_t code = 0;
SSchTaskCallbackParam *param = NULL; SSchTaskCallbackParam *param = NULL;
SMsgSendInfo* pMsgSendInfo = NULL; SMsgSendInfo *pMsgSendInfo = NULL;
pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
if (NULL == pCtx->args) { if (NULL == pCtx->args) {
@ -1396,7 +1406,7 @@ _return:
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0; int32_t code = 0;
SSchHbCallbackParam *param = NULL; SSchHbCallbackParam *param = NULL;
SMsgSendInfo* pMsgSendInfo = NULL; SMsgSendInfo *pMsgSendInfo = NULL;
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0}; SQueryNodeEpId epId = {0};
@ -1450,7 +1460,6 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) { int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
int32_t code = 0; int32_t code = 0;
SSchHbTrans hb = {0}; SSchHbTrans hb = {0};
@ -1475,8 +1484,6 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) { int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
if (pSrc->isHbParam) { if (pSrc->isHbParam) {
SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam)); SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
@ -1568,8 +1575,8 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg,
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* epSet, int32_t msgType, void *msg, uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) { uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
int32_t code = 0; int32_t code = 0;
SSchTrans *trans = (SSchTrans *)transport; SSchTrans *trans = (SSchTrans *)transport;
@ -1601,9 +1608,9 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet*
pMsgSendInfo->msgType = msgType; pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
TMSG_INFO(msgType), ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
pJob->refId, trans->transInst, trans->transHandle); trans->transInst, trans->transHandle);
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
@ -1628,13 +1635,14 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
SSchTrans trans = {0}; SSchTrans trans = {0};
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT; int32_t msgType = TDMT_VND_QUERY_HEARTBEAT;
req.header.vgId = htonl(nodeEpId->nodeId); req.header.vgId = nodeEpId->nodeId;
req.sId = schMgmt.sId; req.sId = schMgmt.sId;
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId)); memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId)); SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
if (NULL == hb) { if (NULL == hb) {
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn, nodeEpId->ep.port); qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
nodeEpId->ep.port);
SCH_ERR_RET(code); SCH_ERR_RET(code);
} }
@ -1689,12 +1697,13 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
SEpSet epSet = {.inUse = 0, .numOfEps = 1}; SEpSet epSet = {.inUse = 0, .numOfEps = 1};
memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep)); memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port); qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
nodeEpId->ep.fqdn, nodeEpId->ep.port);
code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
if (code) { if (code) {
qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code)); trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
@ -1856,7 +1865,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)}; SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
(rpcCtx.args ? &rpcCtx : NULL)));
if (msgType == TDMT_VND_QUERY) { if (msgType == TDMT_VND_QUERY) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle)); SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));

View File

@ -38,7 +38,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg*
req.taskId = pTask->fixedEpDispatcher.taskId; req.taskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
// TODO fix tbname issue // TODO use general name rule of schemaless
char ctbName[TSDB_TABLE_FNAME_LEN + 22]; char ctbName[TSDB_TABLE_FNAME_LEN + 22];
// all groupId must be the same in an array // all groupId must be the same in an array
SSDataBlock* pBlock = taosArrayGet(data, 0); SSDataBlock* pBlock = taosArrayGet(data, 0);

View File

@ -30,6 +30,9 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
pRead->curFileFirstVer = -1; pRead->curFileFirstVer = -1;
pRead->capacity = 0; pRead->capacity = 0;
pRead->status = 0; pRead->status = 0;
taosThreadMutexInit(&pRead->mutex, NULL);
pRead->pHead = taosMemoryMalloc(sizeof(SWalHead)); pRead->pHead = taosMemoryMalloc(sizeof(SWalHead));
if (pRead->pHead == NULL) { if (pRead->pHead == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
@ -135,6 +138,22 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0; return 0;
} }
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) {
taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.len);
if (*ppHead == NULL) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.len);
taosThreadMutexUnlock(&pRead->mutex);
return 0;
}
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code; int code;
// TODO: check wal life // TODO: check wal life
@ -145,7 +164,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
} }
} }
if (!taosValidFile(pRead->pReadLogTFile)) return -1; if (!taosValidFile(pRead->pReadLogTFile)) {
return -1;
}
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead)); code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) { if (code != sizeof(SWalHead)) {

View File

@ -121,7 +121,8 @@ int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) {
return tjsonAddItemToArray(pJson, pJobj); return tjsonAddItemToArray(pJson, pJobj);
} }
int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize, int32_t num) { int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize,
int32_t num) {
if (num > 0) { if (num > 0) {
SJson* pJsonArray = tjsonAddArrayToObject(pJson, pName); SJson* pJsonArray = tjsonAddArrayToObject(pJson, pName);
if (NULL == pJsonArray) { if (NULL == pJsonArray) {

View File

@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) {
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t skipLogNum = 0; int32_t skipLogNum = 0;
while (running) { while (running) {
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 6000); tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 3000);
if (tmqMsg) { if (tmqMsg) {
totalMsgs++; totalMsgs++;