public class MyProducer {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();producer.close();}
- 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
- 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
- 使用Producer发送消息。:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。
public class MyConsumer {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
// consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}}
- 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
- 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
- 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。
kafka官方配置:Apache Kafka
public static final String GROUP_ID_CONFIG = "group.id";
// 大概意思是给消费者组指定一个唯一的string,如果消费者使用subscribe(topic)或基于kafka的偏移量管理策略来使用组管理功能,则需要此属性。public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
./kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "+ "Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records "+ "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
public class MyInterceptor implements ProducerInterceptor {//发送消息时触发@Overridepublic ProducerRecord onSend(ProducerRecord producerRecord) {System.out.println("prudocerRecord : " + producerRecord.toString());return producerRecord;}//收到服务端响应时触发@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {System.out.println("acknowledgement recordMetadata:"+recordMetadata.toString());}//连接关闭时触发@Overridepublic void close() {System.out.println("producer closed");}//整理配置项@Overridepublic void configure(Map<String, ?> map) {System.out.println("=====config start======");for (Map.Entry<String, ?> entry : map.entrySet()) {System.out.println("entry.key:"+entry.getKey()+" === entry.value: "+entry.getValue());}System.out.println("=====config end======");}
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
- key是用来进行分区的可选项。Kafka通过key来判断消息要分发到哪个Partition。
- Value是业务上比较关心的消息。Kafka同样需要将Value对象通过Serializer序列化接口,将Key转换成byte[]数组,这样才能比较好的在网络上传输Value信息,以及将Value信息落盘到操作系统的文件当中。
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
- Producer会根据消息的key选择Partition,具体如何通过key找Partition呢?
- 一个消费者组会共同消费一个Topic下的多个Partition中的同一套消息副本,那Consumer节点是不是可以决定自己消费哪些Partition的消息呢?
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +"<ul>" +"<li>If not set, the default partitioning logic is used. " +"This strategy will try sticking to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +"<ul>" +"<li>If no partition is specified but a key is present, choose a partition based on a hash of the key</li>" +"<li>If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +"</ul>" +"</li>" +"<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: This partitioning strategy is that " +"each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " +"until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +"Please check KAFKA-9965 for more detail." +"</li>" +"</ul>" +"<p>Implementing the <code>org.apache.kafka.clients.producer.Partitioner</code> interface allows you to plug in a custom partitioner.";
Kafka的消息缓存机制涉及到KafkaProducer中的两个关键组件: accumulator 和 sender
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator = new RecordAccumulator(logContext,batchSize,this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs, partitionerConfig,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
//2. 数据发送线程
this.sender = newSender(logContext, kafkaClient, this.metadata);
其中RecordAccumulator,就是Kafka生产者的消息累加器。KafkaProducer要发送的消息都会在ReocrdAccumulator中缓存起来,然后再分批发送给kafka broker。
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are "+ "sent faster than they can be delivered to the server the producer will block for <code>" + MAX_BLOCK_MS_CONFIG + "</code> after which it will throw an exception."+ "<p>"+ "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since "+ "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if "+ "compression is enabled) as well as for maintaining in-flight requests.";//缓冲区每一个batch的大小
public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent"+ " to the same partition. This helps performance on both the client and the server. This configuration controls the "+ "default batch size in bytes. "+ "<p>"+ "No attempt will be made to batch records larger than this size. "+ "<p>"+ "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. "+ "<p>"+ "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable "+ "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a "+ "buffer of the specified batch size in anticipation of additional records."+ "<p>"+ "Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated "+ "for this partition, we will 'linger' for the <code>linger.ms</code> time waiting for more records to show up. "+ "This <code>linger.ms</code> setting defaults to 0, which means we'll immediately send out a record even the accumulated "+ "batch size is under this <code>batch.size</code> setting.";
这里面也提到了几个其他的参数,比如 MAX_BLOCK_MS_CONFIG ,默认60秒
public static final String LINGER_MS_CONFIG = "linger.ms";private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. "+ "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to "+ "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "+ "of artificial delay—that is, rather than immediately sending out a record, the producer will wait for up to "+ "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought "+ "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once "+ "we get <code>" + BATCH_SIZE_CONFIG + "</code> worth of records for a partition it will be sent immediately regardless of this "+ "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the "+ "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>" + LINGER_MS_CONFIG + "=5</code>, "+ "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."+ " Note that if this configuration is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"+ " message reordering after a failed send due to retries (i.e., if retries are enabled); "+ " if retries are disabled or if <code>enable.idempotence</code> is set to true, ordering will be preserved."+ " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ";
if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());this.sender.wakeup();}
public static final String ACKS_CONFIG = "acks";private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "+ " durability of records that are sent. The following settings are allowed: "+ " <ul>"+ " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"+ " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"+ " made that the server has received the record in this case, and the <code>retries</code> configuration will not"+ " take effect (as the client won't generally know of any failures). The offset given back for each record will"+ " always be set to <code>-1</code>."+ " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"+ " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"+ " acknowledging the record but before the followers have replicated it then the record will be lost."+ " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"+ " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."+ "</ul>"+ "<p>"+ "Note that enabling idempotence requires this config value to be 'all'."+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
- acks=0,生产者不关心Broker端有没有将消息写入到Partition,只发送消息就不管了。吞吐量是最高的,但是数据安全性是最低的。
- acks=all or -1,生产者需要等Broker端的所有Partiton(Leader Partition以及其对应的Follower Partition都写完了才能得到返回结果,这样数据是最安全的,但是每次发消息需要等待更长的时间,吞吐量是最低的。
- acks设置成1,则是一种相对中和的策略。Leader Partition在完成自己的消息写入后,就向生产者返回结果。
如果ack设置为all或者-1 ,Kafka也并不是强制要求所有Partition都写入数据后才响应。在Kafka的Broker服务端会有一个配置参数min.insync.replicas,控制Leader Partition在完成多少个Partition的消息写入后,往Producer返回响应。这个参数可以在broker.conf文件中进行配置。
When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.Type: int
Default: 1
Valid Values: [1,...]
Importance: high
Update Mode: cluster-wide
如果要保证消息安全,那么对于每个消息,这两次网络请求就必须要求是幂等的。但是,网络是不靠谱的,在高并发场景下,往往没办法保证这两个请求是幂等的。Producer发送消息的过程中,如果第一步请求成功了, 但是第二步却没有返回。这时,Producer就会认为消息发送失败了。那么Producer必然会发起重试。重试次数由参数ProducerConfig.RETRIES_CONFIG,默认值是Integer.MAX。
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer " + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE+ " (with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"+ ACKS_CONFIG + "</code> must be 'all'. "+ "<p>"+ "Idempotence is enabled by default if no conflicting configurations are set. "+ "If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. "+ "If idempotence is explicitly enabled and conflicting configurations are set, a <code>ConfigException</code> is thrown.";
// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message orderingprivate static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;/** <code>max.in.flight.requests.per.connection</code> */public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."+ " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
回到Producer发消息给Broker这个场景,如果要保证at-most-once语义,可以将ack级别设置为0即可,此时,是不存在幂等性问题的。如果要保证at-least-once语义,就需要将ack级别设置为1或者-1,这样就能保证Leader Partition中的消息至少是写成功了一次的,但是不保证只写了一次。如果要支持Exactly-once语义怎么办呢?这就需要使用到idempotence幂等性属性了。
- PID:每个新的Producer在初始化的过程中就会被分配一个唯一的PID。这个PID对用户是不可见的。
- Sequence Numer: 对于每个PID,这个Producer针对Partition会维护一个sequenceNumber。这是一个从0开始单调递增的数字。当Producer要往同一个Partition发送消息时,这个Sequence Number就会加1。然后会随着消息一起发往Broker。
- Broker端则会针对每个<PID,Partition>维护一个序列号(SN),只有当对应的SequenceNumber = SN+1时,Broker才会接收消息,同时将SN更新为SN+1。否则,SequenceNumber过小就认为消息已经写入了,不需要再重复写入。而如果SequenceNumber过大,就会认为中间可能有数据丢失了。对生产者就会抛出一个OutOfOrderSequenceException。
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
public class TransactionErrorDemo {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 事务IDprops.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"111");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);producer.initTransactions();producer.beginTransaction();for(int i = 0; i < 5; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//异步发送。producer.send(record);if(i == 3){//第三条消息放弃事务之后,整个这一批消息都回退了。System.out.println("error");producer.abortTransaction();}}System.out.println("message sended");try {Thread.sleep(10000);} catch (Exception e) {e.printStackTrace();}
// producer.commitTransaction();producer.close();}

