Configure Destination
Before we do anything, we have to start Kafka first, so let's do it:
Start Kafka
Start kafka
[oracle@edvmr1p0 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties [2020-11-12 12:36:22,697] INFO KafkaConfig values: advertised.host.name = null metric.reporters = [] quota.producer.default = 9223372036854775807 offsets.topic.num.partitions = 50 log.flush.interval.messages = 9223372036854775807 auto.create.topics.enable = true controller.socket.timeout.ms = 30000 log.flush.interval.ms = null principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder replica.socket.receive.buffer.bytes = 65536 min.insync.replicas = 1 replica.fetch.wait.max.ms = 500 num.recovery.threads.per.data.dir = 1 ssl.keystore.type = JKS sasl.mechanism.inter.broker.protocol = GSSAPI default.replication.factor = 1 ssl.truststore.password = null log.preallocate = false sasl.kerberos.principal.to.local.rules = [DEFAULT] fetch.purgatory.purge.interval.requests = 1000 ssl.endpoint.identification.algorithm = null replica.socket.timeout.ms = 30000
Configure Kafka Properties File
Configure Kafka Properties File (e.g. TRG_OGGHOME/dirprm/kafka.properties)
[oracle@edvmr1p0 dirprm]$ cat ~/labs/practice05/replicat/prac_5_1/kafka.properties gg.handlerlist=kafka gg.handler.kafka.type=kafka gg.handler.kafka.KafkaProducerConfigFile=kafka_producer.properties gg.handler.kafka.TopicName=oggbgtopic gg.handler.kafka.format=json gg.handler.kafka.BlockingSend=false gg.handler.kafka.includeTokens=false gg.handler.kafka.mode =tx goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec gg.classpath=dirprm/:/opt/kafka/libs/* jvm.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
Configure Kafka Producer Properties file
Kafka Producer Properties file (e.g. TRG_OGGHOME/dirprm/kafka_producer.properties)
bootstrap.servers=localhost:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000 max.request.size = 5024000 send.buffer.bytes = 5024000
Configure Golden Gate(Replicat)
Again we have to configure the Golden Gate Replicat:
Configure Golden Gate Replicat
[oracle@edvmr1p0 dirprm]$ trg [oracle@edvmr1p0 oggtrg]$ ggsci Oracle GoldenGate Command Interpreter Version 12.2.0.1.160823 OGGCORE_OGGADP.12.2.0.1.0_PLATFORMS_161019.1437 Linux, x64, 64bit (optimized), Generic on Oct 19 2016 16:01:40 Operating system character set identified as UTF-8. Copyright (C) 1995, 2016, Oracle and/or its affiliates. All rights reserved. GGSCI (edvmr1p0) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING GGSCI (edvmr1p0) 2> edit param rkafka GGSCI (edvmr1p0) 3> view param rkafka REPLICAT rkafka TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.properties REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP OGGSRC.*, TARGET OGGTRG.*; GGSCI (edvmr1p0) 4> add replicat rkafka, exttrail ./dirdat/kf REPLICAT added. GGSCI (edvmr1p0) 5> start rkafka Sending START request to MANAGER ... REPLICAT RKAFKA starting GGSCI (edvmr1p0) 6> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING REPLICAT STARTING RKAFKA 00:00:00 00:00:06 GGSCI (edvmr1p0) 7> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING REPLICAT RUNNING RKAFKA 00:00:00 00:00:08 GGSCI (edvmr1p0) 8>
Test
To test the replication, we will insert some rows and see how they are being replicated.
Insert data on Oracle Source DB
[oracle@edvmr1p0 oggsrc]$ sqlplus oggsrc/oracle@orcl SQL*Plus: Release 12.1.0.2.0 Production on Thu Nov 12 12:46:13 2020 Copyright (c) 1982, 2014, Oracle. All rights reserved. Last Successful login time: Thu Nov 12 2020 12:26:00 +00:00 Connected to: Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production With the Partitioning, OLAP, Advanced Analytics and Real Application Testing options SQL> insert into customer_prod select * from customer where customer_id < 101; 100 rows created. SQL> commit; Commit complete. SQL>
Check if the data is replicated between: GG Extract → GG Replicat
Check GG Replication
--Source: GGSCI (edvmr1p0) 4> send priex,stats Sending STATS request to EXTRACT PRIEX ... Start of Statistics at 2020-11-12 12:46:37. DDL replication statistics (for all trails): *** Total statistics since extract started *** Operations 6.00 Output to ./dirdat/in: Extracting from OGGSRC.CUSTOMER_PROD to OGGSRC.CUSTOMER_PROD: *** Total statistics since 2020-11-12 12:46:30 *** Total inserts 100.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 100.00 *** Daily statistics since 2020-11-12 12:46:30 *** Total inserts 100.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 100.00 *** Hourly statistics since 2020-11-12 12:46:30 *** Total inserts 100.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 100.00 *** Latest statistics since 2020-11-12 12:46:30 *** Total inserts 100.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 100.00 End of Statistics. GGSCI (edvmr1p0) 5> --Target GGSCI (edvmr1p0) 8> send rkafka,stats Sending STATS request to REPLICAT RKAFKA ... Start of Statistics at 2020-11-12 12:46:47. Replicating from OGGSRC.CUSTOMER_PROD to OGGTRG.CUSTOMER_PROD: *** Total statistics since 2020-11-12 12:46:34 *** Total inserts 100.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 100.00 *** Daily statistics since 2020-11-12 12:46:34 *** Total inserts 100.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 100.00 *** Hourly statistics since 2020-11-12 12:46:34 *** Total inserts 100.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 100.00 *** Latest statistics since 2020-11-12 12:46:34 *** Total inserts 100.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 100.00 End of Statistics. GGSCI (edvmr1p0) 9>
Finally, we can consume the data on Kafka:
Consume the data from Kafka
[oracle@edvmr1p0 ~]$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic oggbgtopic --from-beginning --zookeeper localhost:2181 {"table":"OGGTRG.CUSTOMER_PROD","op_type":"I","op_ts":"2020-11-12 12:46:28.000148","current_ts":"2020-11-12T12:46:34.075000","pos":"00000000000000002372","after":{"CUSTOMER_ID":1,"FIRST_NAME":"Jaden","LAST_NAME":"Poole","EMAIL_ADDRESS":"[email protected]","SSN":"16060407-7404","ADDRESS":"3640 Ac St.","CITY":"San Juan de Dios","ZIP_CODE":"31014","CUSTOMER_TYPE":1}}{"table":"OGGTRG.CUSTOMER_PROD","op_type":"I","op_ts":"2020-11-12 12:46:28.000148","current_ts":"2020-11-12T12:46:34.076000","pos":"00000000000000002658","after":{"CUSTOMER_ID":2,"FIRST_NAME":"Jack","LAST_NAME":"Cox","EMAIL_ADDRESS":"[email protected]","SSN":"16040918-3654","ADDRESS":"Ap #305-8153 Libero Ave","CITY":"Ca��as","ZIP_CODE":"40813","CUSTOMER_TYPE":4}}{"table":"OGGTRG.CUSTOMER_PROD","op_type":"I","op_ts":"2020-11-12 12:46:28.000148","current_ts":"2020-11-12T12:46:34.077000","pos":"00000000000000002912","after":{"CUSTOMER_ID":3,"FIRST_NAME":"Anastasia","LAST_NAME":"Stewart","EMAIL_ADDRESS":"[email protected]","SSN":"16030603-2822","ADDRESS":"282-2584 Consectetuer Street","CITY":"San Juan (San Juan de Tib��s)","ZIP_CODE":"50409","CUSTOMER_TYPE":7}}{"table":"OGGTRG.CUSTOMER_PROD","op_type":"I","op_ts":"2020-11-12 12:46:28.000148","current_ts":"2020-11-12T12:46:34.077001","pos":"00000000000000003198","after":{"CUSTOMER_ID":4,"FIRST_NAME":"Rowan","LAST_NAME":"Fleming","EMAIL_ADDRESS":"[email protected]","SSN":"16700223-3166","ADDRESS":"7594 Adipiscing. Ave","CITY":"San Vicente","ZIP_CODE":"40102","CUSTOMER_TYPE":2}}{"table":"OGGTRG.CUSTOMER_PROD","op_type":"I","op_ts":"2020-11-12 12:46:28.000148","current_ts":"2020-11-12T12:46:34.077002","pos":"00000000000000003448","after":{"CUSTOMER_ID":5,"FIRST_NAME":"Adena","LAST_NAME":"Williamson","EMAIL_ADDRESS":"[email protected]","SSN":
So, we have configured replication: Oracle RDBMS → Golden Gate (Extract) → Golden Gate (Replicat) → Apache Kafka
Appendix
kafka server properties (e.g. $KAFKA_HOME/config/server.properties)
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 port=9092 delete.topic.enable=true ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = security_protocol://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000