Differences
This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
kafka [2020/11/11 09:47] – andonovj | kafka [2020/11/11 10:11] (current) – andonovj | ||
---|---|---|---|
Line 5: | Line 5: | ||
So for example, you can have: Oracle -> Kafka -> Hadoop/ | So for example, you can have: Oracle -> Kafka -> Hadoop/ | ||
+ | |||
+ | For that, Kafka has two sets of clients: | ||
+ | |||
+ | * Producers - Clients who procedure / import data into Kafka | ||
+ | * Consumers - Clients who consume / read the data from Kafka | ||
+ | |||
+ | |||
+ | {{ : | ||
=====Management===== | =====Management===== | ||
Line 28: | Line 36: | ||
</ | </ | ||
- | ==Create a topic as follows== | + | ====Create a topic==== |
+ | To create a topic we can use teh following command, knowing the zookeeper port and the name of the topic of course: | ||
< | < | ||
Line 36: | Line 45: | ||
- | ==Send message to Kafka== | + | ====Send message to Kafka==== |
+ | Sending messages to Kafka is easy once you know the broker list and the topic to which you want them sent: | ||
< | < | ||
Line 43: | Line 54: | ||
Second message sent to the Kafka cluster | Second message sent to the Kafka cluster | ||
+ | </ | ||
+ | |||
+ | |||
+ | ====Consume message==== | ||
+ | To consume / read a message from Kafka, we can use the following command: | ||
+ | |||
+ | |||
+ | < | ||
+ | [oracle@edvmr1p0 config]$ kafka-console-consumer.sh --bootstrap-server localhost: | ||
+ | Testing Kafka in the context of OGG 12c for Big Data | ||
+ | Second message sent to the Kafka cluster | ||
+ | |||
+ | Second message sent to the Kafka cluster | ||
+ | </ | ||
+ | |||
+ | =====Appendix===== | ||
+ | < | ||
+ | # Licensed to the Apache Software Foundation (ASF) under one or more | ||
+ | # contributor license agreements. | ||
+ | # this work for additional information regarding copyright ownership. | ||
+ | # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
+ | # (the " | ||
+ | # the License. | ||
+ | # | ||
+ | # http:// | ||
+ | # | ||
+ | # Unless required by applicable law or agreed to in writing, software | ||
+ | # distributed under the License is distributed on an "AS IS" BASIS, | ||
+ | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
+ | # See the License for the specific language governing permissions and | ||
+ | # limitations under the License. | ||
+ | # see kafka.server.KafkaConfig for additional details and defaults | ||
+ | |||
+ | ############################# | ||
+ | |||
+ | # The id of the broker. This must be set to a unique integer for each broker. | ||
+ | broker.id=0 | ||
+ | port=9092 | ||
+ | delete.topic.enable=true | ||
+ | |||
+ | ############################# | ||
+ | |||
+ | # The address the socket server listens on. It will get the value returned from | ||
+ | # java.net.InetAddress.getCanonicalHostName() if not configured. | ||
+ | # | ||
+ | # | ||
+ | # | ||
+ | # | ||
+ | # | ||
+ | |||
+ | # Hostname and port the broker will advertise to producers and consumers. If not set, | ||
+ | # it uses the value for " | ||
+ | # returned from java.net.InetAddress.getCanonicalHostName(). | ||
+ | # | ||
+ | |||
+ | # The number of threads handling network requests | ||
+ | num.network.threads=3 | ||
+ | |||
+ | # The number of threads doing disk I/O | ||
+ | num.io.threads=8 | ||
+ | |||
+ | # The send buffer (SO_SNDBUF) used by the socket server | ||
+ | socket.send.buffer.bytes=102400 | ||
+ | |||
+ | # The receive buffer (SO_RCVBUF) used by the socket server | ||
+ | socket.receive.buffer.bytes=102400 | ||
+ | |||
+ | # The maximum size of a request that the socket server will accept (protection against OOM) | ||
+ | socket.request.max.bytes=104857600 | ||
+ | |||
+ | |||
+ | ############################# | ||
+ | |||
+ | # A comma seperated list of directories under which to store log files | ||
+ | log.dirs=/ | ||
+ | |||
+ | # The default number of log partitions per topic. More partitions allow greater | ||
+ | # parallelism for consumption, | ||
+ | # the brokers. | ||
+ | num.partitions=1 | ||
+ | |||
+ | # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. | ||
+ | # This value is recommended to be increased for installations with data dirs located in RAID array. | ||
+ | num.recovery.threads.per.data.dir=1 | ||
+ | |||
+ | ############################# | ||
+ | |||
+ | # Messages are immediately written to the filesystem but by default we only fsync() to sync | ||
+ | # the OS cache lazily. The following configurations control the flush of data to disk. | ||
+ | # There are a few important trade-offs here: | ||
+ | # 1. Durability: Unflushed data may be lost if you are not using replication. | ||
+ | # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. | ||
+ | # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. | ||
+ | # The settings below allow one to configure the flush policy to flush data after a period of time or | ||
+ | # every N messages (or both). This can be done globally and overridden on a per-topic basis. | ||
+ | |||
+ | # The number of messages to accept before forcing a flush of data to disk | ||
+ | # | ||
+ | |||
+ | # The maximum amount of time a message can sit in a log before we force a flush | ||
+ | # | ||
+ | |||
+ | ############################# | ||
+ | |||
+ | # The following configurations control the disposal of log segments. The policy can | ||
+ | # be set to delete segments after a period of time, or after a given size has accumulated. | ||
+ | # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens | ||
+ | # from the end of the log. | ||
+ | |||
+ | # The minimum age of a log file to be eligible for deletion | ||
+ | log.retention.hours=168 | ||
+ | |||
+ | # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining | ||
+ | # segments don't drop below log.retention.bytes. | ||
+ | # | ||
+ | |||
+ | # The maximum size of a log segment file. When this size is reached a new log segment will be created. | ||
+ | log.segment.bytes=1073741824 | ||
+ | |||
+ | # The interval at which log segments are checked to see if they can be deleted according | ||
+ | # to the retention policies | ||
+ | log.retention.check.interval.ms=300000 | ||
+ | |||
+ | ############################# | ||
+ | |||
+ | # Zookeeper connection string (see zookeeper docs for details). | ||
+ | # This is a comma separated host:port pairs, each corresponding to a zk | ||
+ | # server. e.g. " | ||
+ | # You can also append an optional chroot string to the urls to specify the | ||
+ | # root directory for all kafka znodes. | ||
+ | zookeeper.connect=localhost: | ||
+ | |||
+ | # Timeout in ms for connecting to zookeeper | ||
+ | zookeeper.connection.timeout.ms=6000 | ||
</ | </ |