refactor: do some internal refactor.
This commit is contained in:
parent
6c641cff39
commit
8fac91e265
|
@ -180,7 +180,6 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||||
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId);
|
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
|
|
||||||
|
|
|
@ -112,10 +112,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if offset value exists
|
|
||||||
char key[128] = {0};
|
|
||||||
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
|
|
||||||
|
|
||||||
if (tInputQueueIsFull(pTask)) {
|
if (tInputQueueIsFull(pTask)) {
|
||||||
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
|
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
|
|
@ -25,21 +25,6 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
||||||
return taosStrdup(buf);
|
return taosStrdup(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// stream_task:stream_id:task_id
|
|
||||||
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) {
|
|
||||||
int32_t n = 12;
|
|
||||||
char* p = dst;
|
|
||||||
|
|
||||||
memcpy(p, "stream_task:", n);
|
|
||||||
p += n;
|
|
||||||
|
|
||||||
int32_t inc = tintToHex(streamId, p);
|
|
||||||
p += inc;
|
|
||||||
|
|
||||||
*(p++) = ':';
|
|
||||||
tintToHex(taskId, p);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
|
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
|
||||||
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
|
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
|
Loading…
Reference in New Issue