E
- Type of the data in the queue.public abstract class QueueBatchUniqueProcessor<E> extends QueueConsumer<E>
One working thread will be created for each instance of this class when necessary.
本类的每个实例相应的会有一个工作线程在需要的时候被创建。
For each batch, data will be taken from the queue as much as possible. Maximum size of a batch and maximum time for waiting for new data can be configured. If the maximum size limit reached, or maximum wait time limit reached, all data in current batch will be processed, and further data taken will be put into later batches. Duplicated data will be discarded in a batch, which means, if there are duplicated data taken from the queue in a batch, only one instance of those duplicated will be processed.
每批会尽可能地多取一些数据,可以设定每批最大的数据量,以及最长的等待时间。 如果达到了这个量,或者是达到了这个时间,则当前批的数据就处理掉,然后开始下一批。 每批数据中如果有重复的,会被剔除掉,也就是说,在一批当中不会重复处理。
Modifier and Type | Field and Description |
---|---|
protected int |
maxBatchSize |
protected long |
pollTimeout |
protected TimeUnit |
pollTimeoutUnit |
defaultThreadPool, mode, MODE_INIT, MODE_RUNNING, MODE_START, MODE_STOP_ASAP, MODE_STOP_WHEN_EMPTY, MODE_STOPPED, name, queue, thread, threadPool
Constructor and Description |
---|
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
int batchSize)
Constructor to create an instance that uses default name - QueueConsumer.class.getSimpleName(),
default thread pool and do not wait for new data.
创建一个使用缺省名称(QueueConsumer.class.getSimpleName())、缺省线程池且不等待新数据到来的实例。 |
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
int batchSize,
long batchWaitTimeout,
TimeUnit timeoutUnit)
Constructor to create an instance with default name: QueueConsumer.class.getSimpleName()
创建一个实例,其名称使用缺省名称:QueueConsumer.class.getSimpleName()。 |
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
String name,
ExecutorService executorService,
int batchSize)
Constructor to create an instance that do not wait for new data.
创建一个不等待新数据到来的实例。 |
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
String name,
ExecutorService executorService,
int batchSize,
long batchWaitTimeout,
TimeUnit timeoutUnit)
Constructor to create an instance.
创建一个实例。 |
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
String name,
int batchSize)
Constructor to create an instance that uses default thread pool and do not wait for new data.
创建一个使用缺省线程池且不等待新数据到来的实例。 |
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
String name,
int batchSize,
long batchWaitTimeout,
TimeUnit timeoutUnit)
Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。 |
QueueBatchUniqueProcessor(int batchSize)
Constructor to create an instance that uses default name - QueueConsumer.class.getSimpleName(),
default thread pool and do not wait for new data.
创建一个使用缺省名称(QueueConsumer.class.getSimpleName())、缺省线程池且不等待新数据到来的实例。 |
QueueBatchUniqueProcessor(int batchSize,
long batchWaitTimeout,
TimeUnit timeoutUnit)
Constructor to create an instance with default name: QueueConsumer.class.getSimpleName()
创建一个实例,其名称使用缺省名称:QueueConsumer.class.getSimpleName()。 |
QueueBatchUniqueProcessor(String name,
ExecutorService executorService,
int batchSize)
Constructor to create an instance that do not wait for new data.
创建一个不等待新数据到来的实例。 |
QueueBatchUniqueProcessor(String name,
ExecutorService executorService,
int batchSize,
long batchWaitTimeout,
TimeUnit timeoutUnit)
Constructor to create an instance.
创建一个实例。 |
QueueBatchUniqueProcessor(String name,
int batchSize)
Constructor to create an instance that uses default thread pool and do not wait for new data.
创建一个使用缺省线程池且不等待新数据到来的实例。 |
QueueBatchUniqueProcessor(String name,
int batchSize,
long batchWaitTimeout,
TimeUnit timeoutUnit)
Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。 |
Modifier and Type | Method and Description |
---|---|
protected void |
consume()
This method is overridden over parent class so that a batch of data is taken
from the queue and
process(Set) is invoked.这个方法被重载了,从而队列中的一批数据会被取出并调用 process(Set) 方法。 |
abstract void |
process(Set<E> batch)
Process one piece of data - this method should be overridden in subclass.
处理一份数据——这个方法应该在子类中被重载。 |
protected int maxBatchSize
protected long pollTimeout
protected TimeUnit pollTimeoutUnit
public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, String name, ExecutorService executorService, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
name
- Name of this instance, which determines the naming of working thread.workQueue
- The queue that data for processing will be fetched from.executorService
- Thread pool that working thread will be get from.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.batchWaitTimeout
- Maximum time period allowed for waiting for new data from the queue
before current batch is processed, 0 means no waiting.timeoutUnit
- Unit of the batchWaitTimeout parameter.public QueueBatchUniqueProcessor(String name, ExecutorService executorService, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
name
- Name of this instance, which determines the naming of working thread.executorService
- Thread pool that working thread will be get from.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.batchWaitTimeout
- Maximum time period allowed for waiting for new data from the queue
before current batch is processed, 0 means no waiting.timeoutUnit
- Unit of the batchWaitTimeout parameter.public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, String name, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
name
- Name of this instance, which determines the naming of working thread.workQueue
- The queue that data for processing will be fetched from.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.batchWaitTimeout
- Maximum time period allowed for waiting for new data from the queue
before current batch is processed, 0 means no waiting.timeoutUnit
- Unit of the batchWaitTimeout parameter.public QueueBatchUniqueProcessor(String name, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
name
- Name of this instance, which determines the naming of working thread.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.batchWaitTimeout
- Maximum time period allowed for waiting for new data from the queue
before current batch is processed, 0 means no waiting.timeoutUnit
- Unit of the batchWaitTimeout parameter.public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
workQueue
- The queue that data for processing will be fetched from.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.batchWaitTimeout
- Maximum time period allowed for waiting for new data from the queue
before current batch is processed, 0 means no waiting.timeoutUnit
- Unit of the batchWaitTimeout parameter.public QueueBatchUniqueProcessor(int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.batchWaitTimeout
- Maximum time period allowed for waiting for new data from the queue
before current batch is processed, 0 means no waiting.timeoutUnit
- Unit of the batchWaitTimeout parameter.public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, String name, ExecutorService executorService, int batchSize)
name
- Name of this instance, which determines the naming of working thread.workQueue
- The queue that data for processing will be fetched from.executorService
- Thread pool that working thread will be get from.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.public QueueBatchUniqueProcessor(String name, ExecutorService executorService, int batchSize)
name
- Name of this instance, which determines the naming of working thread.executorService
- Thread pool that working thread will be get from.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, String name, int batchSize)
name
- Name of this instance, which determines the naming of working thread.workQueue
- The queue that data for processing will be fetched from.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.public QueueBatchUniqueProcessor(String name, int batchSize)
name
- Name of this instance, which determines the naming of working thread.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, int batchSize)
workQueue
- The queue that data for processing will be fetched from.batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.public QueueBatchUniqueProcessor(int batchSize)
batchSize
- Maximum size allowed for a batch, remaining data will be put into later batches.protected void consume()
process(Set)
is invoked.process(Set)
方法。consume
in class QueueConsumer<E>
public abstract void process(Set<E> batch)
This method may be interrupted while running, so please note the following:
这个方法在运行过程中可能会遇到线程的interrupt,所以如果有以下情况要注意正确处理:
If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.
If this thread is blocked in an I/O operation upon an interruptible channel then the channel will be closed, the thread's interrupt status will be set, and the thread will receive a ClosedByInterruptException.
If this thread is blocked in a Selector then the thread's interrupt status will be set and it will return immediately from the selection operation, possibly with a non-zero value, just as if the selector's wakeup method were invoked.
batch
- The data taken from queue, which needs to be processedCopyright © 2014. All Rights Reserved.