kafka

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
kafka [2020/11/11 09:48] andonovjkafka [2020/11/11 10:11] (current) andonovj
Line 5: Line 5:
  
 So for example, you can have: Oracle -> Kafka -> Hadoop/Hbase So for example, you can have: Oracle -> Kafka -> Hadoop/Hbase
 +
 +For that, Kafka has two sets of clients:
 +
 +  * Producers - Clients who procedure / import data into Kafka
 +  * Consumers - Clients who consume / read the data from Kafka
 +
 +
 +{{ :kafkaarch.png?400 |}}
  
 =====Management===== =====Management=====
Line 28: Line 36:
 </Code> </Code>
  
-====Create a topic as follows====+====Create a topic====
 To create a topic we can use teh following command, knowing the zookeeper port and the name of the topic of course: To create a topic we can use teh following command, knowing the zookeeper port and the name of the topic of course:
  
Line 46: Line 54:
  
 Second message sent to the Kafka cluster                              <- Second Message Second message sent to the Kafka cluster                              <- Second Message
 +</Code>
 +
 +
 +====Consume message====
 +To consume / read a message from Kafka, we can use the following command:
 +
 +
 +<Code:bash|Consume/Read Message>
 +[oracle@edvmr1p0 config]$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ogg12cBigData --from-beginning --zookeeper localhost:2181
 +Testing Kafka in the context of OGG 12c for Big Data
 +Second message sent to the Kafka cluster
 +
 +Second message sent to the Kafka cluster
 +</Code>
 +
 +=====Appendix=====
 +<Code:bash|Kafka server.properties>
 +# Licensed to the Apache Software Foundation (ASF) under one or more
 +# contributor license agreements.  See the NOTICE file distributed with
 +# this work for additional information regarding copyright ownership.
 +# The ASF licenses this file to You under the Apache License, Version 2.0
 +# (the "License"); you may not use this file except in compliance with
 +# the License.  You may obtain a copy of the License at
 +#
 +#    http://www.apache.org/licenses/LICENSE-2.0
 +#
 +# Unless required by applicable law or agreed to in writing, software
 +# distributed under the License is distributed on an "AS IS" BASIS,
 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +# See the License for the specific language governing permissions and
 +# limitations under the License.
 +# see kafka.server.KafkaConfig for additional details and defaults
 +
 +############################# Server Basics #############################
 +
 +# The id of the broker. This must be set to a unique integer for each broker.
 +broker.id=0
 +port=9092
 +delete.topic.enable=true
 +
 +############################# Socket Server Settings #############################
 +
 +# The address the socket server listens on. It will get the value returned from 
 +# java.net.InetAddress.getCanonicalHostName() if not configured.
 +#   FORMAT:
 +#     listeners = security_protocol://host_name:port
 +#   EXAMPLE:
 +#     listeners = PLAINTEXT://your.host.name:9092
 +#listeners=PLAINTEXT://:9092
 +
 +# Hostname and port the broker will advertise to producers and consumers. If not set, 
 +# it uses the value for "listeners" if configured.  Otherwise, it will use the value
 +# returned from java.net.InetAddress.getCanonicalHostName().
 +#advertised.listeners=PLAINTEXT://your.host.name:9092
 +
 +# The number of threads handling network requests
 +num.network.threads=3
 +
 +# The number of threads doing disk I/O
 +num.io.threads=8
 +
 +# The send buffer (SO_SNDBUF) used by the socket server
 +socket.send.buffer.bytes=102400
 +
 +# The receive buffer (SO_RCVBUF) used by the socket server
 +socket.receive.buffer.bytes=102400
 +
 +# The maximum size of a request that the socket server will accept (protection against OOM)
 +socket.request.max.bytes=104857600
 +
 +
 +############################# Log Basics #############################
 +
 +# A comma seperated list of directories under which to store log files
 +log.dirs=/tmp/kafka-logs
 +
 +# The default number of log partitions per topic. More partitions allow greater
 +# parallelism for consumption, but this will also result in more files across
 +# the brokers.
 +num.partitions=1
 +
 +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
 +# This value is recommended to be increased for installations with data dirs located in RAID array.
 +num.recovery.threads.per.data.dir=1
 +
 +############################# Log Flush Policy #############################
 +
 +# Messages are immediately written to the filesystem but by default we only fsync() to sync
 +# the OS cache lazily. The following configurations control the flush of data to disk.
 +# There are a few important trade-offs here:
 +#    1. Durability: Unflushed data may be lost if you are not using replication.
 +#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
 +#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
 +# The settings below allow one to configure the flush policy to flush data after a period of time or
 +# every N messages (or both). This can be done globally and overridden on a per-topic basis.
 +
 +# The number of messages to accept before forcing a flush of data to disk
 +#log.flush.interval.messages=10000
 +
 +# The maximum amount of time a message can sit in a log before we force a flush
 +#log.flush.interval.ms=1000
 +
 +############################# Log Retention Policy #############################
 +
 +# The following configurations control the disposal of log segments. The policy can
 +# be set to delete segments after a period of time, or after a given size has accumulated.
 +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
 +# from the end of the log.
 +
 +# The minimum age of a log file to be eligible for deletion
 +log.retention.hours=168
 +
 +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
 +# segments don't drop below log.retention.bytes.
 +#log.retention.bytes=1073741824
 +
 +# The maximum size of a log segment file. When this size is reached a new log segment will be created.
 +log.segment.bytes=1073741824
 +
 +# The interval at which log segments are checked to see if they can be deleted according
 +# to the retention policies
 +log.retention.check.interval.ms=300000
 +
 +############################# Zookeeper #############################
 +
 +# Zookeeper connection string (see zookeeper docs for details).
 +# This is a comma separated host:port pairs, each corresponding to a zk
 +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
 +# You can also append an optional chroot string to the urls to specify the
 +# root directory for all kafka znodes.
 +zookeeper.connect=localhost:2181
 +
 +# Timeout in ms for connecting to zookeeper
 +zookeeper.connection.timeout.ms=6000
 </Code> </Code>
  • kafka.1605088130.txt.gz
  • Last modified: 2020/11/11 09:48
  • by andonovj