区块链技术博客
www.b2bchain.cn

如何创建Kafka压缩主题 – java程序员分享

本文介绍了如何创建Kafka压缩主题 – java程序员分享,有助于帮助完成毕业设计以及求职,是一篇很好的资料。

对技术面试,学习经验等有一些体会,在此分享。

我有一个Kafka应用程序,该应用程序有一个producer谁会生成主题消息。 consumer然后从主题中获取消息,对给定的消息进行一些逻辑处理,然后将它们生成到另一个主题。
我正在使用ProducerRecordConsumerRecords

我希望我的应用创建2个compacted topics,然后使用它们。如果compacted topics已经存在,则仅显示一条消息并继续。

我的SimpleProducer课:

  package com.kafkatest.demo;  import java.util.*;  import org.apache.kafka.clients.producer.*; public class SimpleProducer extends Thread{     public static void main(String[] args) throws Exception{        String topicName = "nodesTopic";       String key = "Key1";       String value = "Value-1";        String key1 = "Key2";       String value1 = "Value-2";          Properties props = new Properties();       props.put("bootstrap.servers", "localhost:9092,localhost:9093");       props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        Producer<String, String> producer = new KafkaProducer <>(props);        ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);       producer.send(record);         ProducerRecord<String, String> record2 = new ProducerRecord<>(topicName,key1,value1);       producer.send(record2);         ProducerRecord<String, String> record3 = new ProducerRecord<>(topicName,key,value);       producer.send(record3);         ProducerRecord<String, String> record4 = new ProducerRecord<>(topicName,key,value);       producer.send(record4);         ProducerRecord<String, String> record5 = new ProducerRecord<>(topicName,key,value);       producer.send(record5);         ProducerRecord<String, String> record6 = new ProducerRecord<>(topicName,key,value);       producer.send(record6);       producer.close();        System.out.println("SimpleProducer Completed.");    } } 

我的SimpleConsumer类:

   package com.kafkatest.demo;  import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Properties;  import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;  public class SimpleConsumer extends Thread{      public static void main(String[] args) {      Properties props1 = new Properties();     props1.put("bootstrap.servers", "localhost:9092,localhost:9093");     props1.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");              props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");      Producer<String, String> producer = new KafkaProducer <>(props1);      Duration duration = Duration.of(2, ChronoUnit.MINUTES);     String topicName = "nodesTopic";      Properties props = new Properties();     props.put("bootstrap.servers", "localhost:9092");     props.put("group.id", "consumer-tutorial");     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);       consumer.subscribe(Arrays.asList(topicName));      try {         while (true) {         try {             Thread.sleep(5000);         } catch (InterruptedException e) {             e.printStackTrace();         }         consumer.beginningOffsets(consumer.assignment());           ConsumerRecords<String, String> records = consumer.poll(duration);           for (ConsumerRecord<String, String> record : records) {             System.out.println(record.offset() + ": " + record.value());             System.out.println("Record: " + record.value().toLowerCase());             ProducerRecord<String, String> record1 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + record.value().toLowerCase());             String a = "" + records.count();             ProducerRecord<String, String> record2 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + a);             producer.send(record1);             producer.send(record2);           }         }       } finally {         producer.close();         consumer.close();       }      }  } 

当我运行bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forecastTopic --from-beginning并运行生产者两次时,我得到

0. value-1 0. 6 1. value-2 1. 6 2. value-1 2. 6 3. value-1 3. 6 4. value-1 4. 6 5. value-1 5. 6 6. value-1 6. 6 7. value-2 7. 6 8. value-1 8. 6 9. value-1 9. 6 10. value-1 10. 6 11. value-1 11. 6 12. value-1 12. 6 13. value-2 13. 6 14. value-1 14. 6 15. value-1 15. 6 16. value-1 16. 6 17. value-1 17. 6 18. value-1 18. 6 19. value-2 19. 6 20. value-1 20. 6 21. value-1 21. 6 22. value-1 22. 6 23. value-1 23. 6 24. value-1 24. 6 25. value-2 25. 6 26. value-1 26. 6 27. value-1 27. 6 28. value-1 28. 6 29. value-1 29. 6 30. value-1 30. 6 31. value-2 31. 6 32. value-1 32. 6 33. value-1 33. 6 34. value-1 34. 6 35. value-1 35. 6 36. value-1 36. 6 37. value-2 37. 6 38. value-1 38. 6 39. value-1 39. 6 40. value-1 40. 6 41. value-1 41. 6 42. value-1 42. 6 43. value-2 43. 6 44. value-1 44. 6 45. value-1 45. 6 46. value-1 46. 6 47. value-1 47. 6 48. value-1 48. 12 49. value-2 49. 12 50. value-1 50. 12 51. value-1 51. 12 52. value-1 52. 12 53. value-1 53. 12 54. value-1 54. 12 55. value-2 55. 12 56. value-1 56. 12 57. value-1 57. 12 58. value-1 58. 12 59. value-1 59. 12 60. value-1 60. 6 61. value-2 61. 6 62. value-1 62. 6 63. value-1 63. 6 64. value-1 64. 6 65. value-1 65. 6 66. value-1 66. 6 67. value-2 67. 6 68. value-1 68. 6 69. value-1 69. 6 70. value-1 70. 6 71. value-1 71. 6 72. value-1 72. 6 73. value-2 73. 6 74. value-1 74. 6 75. value-1 75. 6 76. value-1 76. 6 77. value-1 77. 6 78. value-1 78. 6 79. value-2 79. 6 80. value-1 80. 6 81. value-1 81. 6 82. value-1 82. 6 83. value-1 83. 6 

我将log.cleanup.policy=compact放在server.properties文件中,但是它似乎不起作用,因为我在该主题中拥有所有83个偏移量。

谢谢。

java大神给出的解决方案

在server.properties中设置log.cleanup.policy=compact时,它将是创建新主题时的默认策略。如果您在创建主题后更改server.properties,则主题配置不会更改。

您可以更改主题配置以设置cleanup.policy=compact

由于日志清理器进行了压缩,因此您可能需要在主题上设置特定的delete.retention.ms,因为默认保留时间为24小时。

最后,压缩不会在活动段上发生。
见Kafka Log Compaction not starting

我正在使用Jackson 2库,并且尝试读取JSON响应,如下所示:{ "value":"Hello" } 当value为空时,JSON响应如下所示:{ "value":{} } 我的模型POJO类看起来像这样public class Hello { private String value; pu…

我有以下代码,我希望从情况2的情况下抛出ConcurrentModificationException,但它运行成功。据我所知,如果我对地图中的单个键执行相同的操作,则不会抛出异常,因为here但是当我重现这种具有两个案例的多个密钥的场景时,通过新密钥修改。通过现有密钥进行修改。情况1: Map<String,String> mp = new H…

我正在使用Retrofit来获取JSON答复。这是我实施的一部分("/api/report/list") Observable<Bills> listBill(@Query("employee_id") String employeeID); 而条例草案类是-public static class…

我有一个可以接受任何数量的INTEGER参数的方法:pages(int,int…)此方法是选择PDF文件的某些页面。以以下字符串类型存储的书页:String pages = "1,2,3,6,9"; 我想将此字符串作为方法的参数看起来像:pages(1,2,3,6,9); java大神给出的解决方案 使用流可以很容易地做到这一点:St…

java.net.URI.create("http://adserver.adtech.de/adlink|3.0") 抛出java.net.URISyntaxException: Illegal character in path at index 32: http://adserver.adtech.de/adlink|3.0 虽然n…

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » 如何创建Kafka压缩主题 – java程序员分享
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们