建服装类网站需要考虑的因素,企业所得税怎么征收几个点,网站建设 宁夏,深圳国税局网站怎么做票种核定一、同步消息
1、生产者
同步发送的意思就是#xff0c;一条消息发送之后#xff0c;会阻塞当前线程#xff0c;直至返回 ack。 由于 send 方法返回的是一个 Future 对象#xff0c;根据 Futrue 对象的特点#xff0c;我们也可以实现同 步发送的效果#xff0c;只需在调…一、同步消息
1、生产者
同步发送的意思就是一条消息发送之后会阻塞当前线程直至返回 ack。 由于 send 方法返回的是一个 Future 对象根据 Futrue 对象的特点我们也可以实现同 步发送的效果只需在调用 Future 对象的 get 方发即可。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties new Properties();// 2. 给kafka配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key,value序列化必须properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 3. 创建kafka生产者对象KafkaProducerString,String kafkaProducer new KafkaProducerString,String(properties);// 4. 调用send方法发送消息for (int i 0; i 10; i) {// 默认为异步发送kafkaProducer.send(new ProducerRecord(first1, atguigu i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord(first1, atguigu i)).get();}// 5. 关闭资源kafkaProducer.close();}
}
二、异步消息
1、生产者
异步消息有两种
1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties new Properties();// 2. 给kafka配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key,value序列化必须properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 3. 创建kafka生产者对象KafkaProducerString,String kafkaProducer new KafkaProducerString,String(properties);// 4. 调用send方法发送消息for (int i 0; i 10; i) {kafkaProducer.send(new ProducerRecord(first, wtyy));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用为异步调用该方法有两个参数分别是 RecordMetadata 和 Exception如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。
注意消息发送失败会自动重试不需要我们在回调函数中手动重试。
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties new Properties();// 2. 给kafka配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key,value序列化必须// 序列化器的serialization是一个接口找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 3. 创建kafka生产者对象KafkaProducerString,String kafkaProducer new KafkaProducerString,String(properties);// 4. 调用send方法发送消息for (int i 0; i 10; i) {kafkaProducer.send(new ProducerRecord(first1, atguigu i),new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功 exception null 接受到服务端ack消息 调用该方法//(2)消息发送失败 exception ! null 也会调用该方法if (exception null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}
三、顺序消息
以订单为例
生产者将相同的key的订单状态事件推送到kafka的同一分区kafka 消费者接收消息消费者将消息提交给线程池线程池根据接收到的消息将订单状态事件使用路由策略选择其中一个线程将具有相同路由key的事件发送到同一个线程的阻塞队列中单个线程不停的从阻塞队列获取订单状态消息消费 RestController
public class OrderController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;GetMapping(/send)public String send() throws InterruptedException {int size 1000;for (int i 0; i size; i) {OrderDto orderDto new InterOrderDto();orderDto.setOrderNo(i );orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return success;}private String getStatus(int status){return status 0 ? 待支付 : status 1 ? 已支付 : 支付失败;}
}