TD-12678 datasink interface adjust
This commit is contained in:
parent
d7ddb1589b
commit
25a48e4006
|
@ -24,9 +24,9 @@ extern "C" {
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
|
|
||||||
#define DS_CAPACITY_ENOUGH 1
|
#define DS_CAPACITY_ENOUGH 1
|
||||||
#define DS_CAPACITY_FULL 2
|
#define DS_DATA_FULL 2
|
||||||
#define DS_NEED_SCHEDULE 3
|
#define DS_NEED_SCHEDULE 3
|
||||||
#define DS_END 4
|
#define DS_QUERY_END 4
|
||||||
#define DS_IN_PROCESS 5
|
#define DS_IN_PROCESS 5
|
||||||
|
|
||||||
struct SDataSink;
|
struct SDataSink;
|
||||||
|
|
|
@ -124,7 +124,8 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
|
||||||
|
|
||||||
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
||||||
pthread_mutex_lock(&pDispatcher->mutex);
|
pthread_mutex_lock(&pDispatcher->mutex);
|
||||||
int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL;
|
int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery
|
||||||
|
? DS_CAPACITY_ENOUGH : DS_DATA_FULL;
|
||||||
pDispatcher->status = status;
|
pDispatcher->status = status;
|
||||||
pthread_mutex_unlock(&pDispatcher->mutex);
|
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||||
return status;
|
return status;
|
||||||
|
@ -152,14 +153,14 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
|
||||||
static void endPut(struct SDataSinkHandle* pHandle) {
|
static void endPut(struct SDataSinkHandle* pHandle) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
pthread_mutex_lock(&pDispatcher->mutex);
|
pthread_mutex_lock(&pDispatcher->mutex);
|
||||||
pDispatcher->status = DS_END;
|
pDispatcher->status = DS_QUERY_END;
|
||||||
pthread_mutex_unlock(&pDispatcher->mutex);
|
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) {
|
static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
||||||
*pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS;
|
*pStatus = (DS_QUERY_END == getStatus(pDispatcher) ? DS_QUERY_END : DS_IN_PROCESS);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
SDataDispatchBuf* pBuf = NULL;
|
SDataDispatchBuf* pBuf = NULL;
|
||||||
|
|
Loading…
Reference in New Issue