Configure Destination
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows.
We have to configure:
- Flume.properties file
- Flume RPC client properties file
Configure Flume Properties
In the target OGG for big data, create the following files
Create Flume.properties file (TRG_OGGHOME/dirprm/flume.properties)
gg.handlerlist = flume gg.handler.flume.type=flume gg.handler.flume.RpcClientPropertiesFile=custom-flume-rpc.properties gg.handler.flume.format=avro_op gg.handler.flume.mode=op gg.handler.flume.EventMapsTo=op gg.handler.flume.PropagateSchema=true gg.handler.flume.includeTokens=false goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec gg.classpath=dirprm/:/opt/flume/lib/* jvm.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
Configure Flume RPC Client
To configure the client, we have to create custom-flume-rpc.properties in the same file:
Create customer RPC File (e.g. TRG_OGGHOME/dirprm/custom-flume-rpc.properties)
client.type=default hosts=h1 hosts.h1=localhost:41414 batch-size=100 connect-timeout=20000 request-timeout=20000
Start Flume
Start Flume
[oracle@edvmr1p0 conf]$ hdfs dfs -mkdir /user/oracle/flume [oracle@edvmr1p0 conf]$ flume-ng agent --conf /opt/flume/conf -f /opt/flume/conf/flume.conf -Dflume.root.logger=DEBUG,LOGFILE -n agent1 -Dorg.apache.flume.log.rawdata=true Info: Sourcing environment configuration script /opt/flume/conf/flume-env.sh Info: Including Hadoop libraries found via (/opt/hadoop/bin/hadoop) for HDFS access Info: Including HBASE libraries found via (/opt/hbase/bin/hbase) for HBASE access + exec /usr/java/latest/bin/java -Xms100m -Xmx2000m -Dcom.sun.management.jmxremote -Dflume.root.logger=DEBUG,LOGFILE -Dorg.apache.flume.log.rawdata=true -cp '/opt/flume/conf:/opt/flume/lib/*:/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/contrib/capacity-scheduler/*.jar:/opt/hbase/conf:/usr/java/latest/lib/tools.jar:/opt/hbase:/opt/hbase/lib/activation-1.1.jar:/opt/hbase/lib/aopalliance-1.0.jar:/opt/hbase/lib/apacheds-i18n-2.0.0-M15.jar:/opt/hbase/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/opt/hbase/lib/api-asn1-api-1.0.0-M2
Configure the GG Replicat
Again, we have to configure the GG with the replicat:
Configure GoldenGate Replicat
[oracle@edvmr1p0 dirprm]$ trg [oracle@edvmr1p0 oggtrg]$ ggsci Oracle GoldenGate Command Interpreter Version 12.2.0.1.160823 OGGCORE_OGGADP.12.2.0.1.0_PLATFORMS_161019.1437 Linux, x64, 64bit (optimized), Generic on Oct 19 2016 16:01:40 Operating system character set identified as UTF-8. Copyright (C) 1995, 2016, Oracle and/or its affiliates. All rights reserved. GGSCI (edvmr1p0) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING GGSCI (edvmr1p0) 2> edit param rflume REPLICAT rflume TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.properties REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP OGGSRC.*, TARGET OGGTRG.*; :wq GGSCI (edvmr1p0) 3> add replicat rflume, exttrail ./dirdat/fl REPLICAT added. GGSCI (edvmr1p0) 4> start rflume Sending START request to MANAGER ... REPLICAT RFLUME starting GGSCI (edvmr1p0) 5> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING REPLICAT RUNNING RFLUME 00:00:00 00:00:07
Test
To test replication, we will simply insert some records and see if we can see them replicated in the log
Insert rows
[oracle@edvmr1p0 oggsrc]$ sqlplus oggsrc/oracle@orcl SQL*Plus: Release 12.1.0.2.0 Production on Thu Nov 12 12:12:27 2020 Copyright (c) 1982, 2014, Oracle. All rights reserved. Last Successful login time: Thu Nov 12 2020 11:40:44 +00:00 Connected to: Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production With the Partitioning, OLAP, Advanced Analytics and Real Application Testing options SQL> insert into customer_prod select * from customer where customer_id < 21; 20 rows created. SQL> commit; Commit complete. SQL>
Then, we can check the stats from the GoldenGate
Check GoldenGate Stats
--Source (Extract) GGSCI (edvmr1p0) 5> send priex, stats Sending STATS request to EXTRACT PRIEX ... Start of Statistics at 2020-11-12 12:13:16. DDL replication statistics (for all trails): *** Total statistics since extract started *** Operations 6.00 Output to ./dirdat/in: Extracting from OGGSRC.CUSTOMER_PROD to OGGSRC.CUSTOMER_PROD: *** Total statistics since 2020-11-12 12:13:12 *** Total inserts 20.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 20.00 *** Daily statistics since 2020-11-12 12:13:12 *** Total inserts 20.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 20.00 *** Hourly statistics since 2020-11-12 12:13:12 *** Total inserts 20.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 20.00 *** Latest statistics since 2020-11-12 12:13:12 *** Total inserts 20.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 20.00 End of Statistics. GGSCI (edvmr1p0) 6> --Target (Replicat) GGSCI (edvmr1p0) 23> send rflume, stats Sending STATS request to REPLICAT RFLUME ... Start of Statistics at 2020-11-12 12:13:26. Replicating from OGGSRC.CUSTOMER_PROD to OGGTRG.CUSTOMER_PROD: *** Total statistics since 2020-11-12 12:13:14 *** Total inserts 20.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 20.00 *** Daily statistics since 2020-11-12 12:13:14 *** Total inserts 20.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 20.00 *** Hourly statistics since 2020-11-12 12:13:14 *** Total inserts 20.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 20.00 *** Latest statistics since 2020-11-12 12:13:14 *** Total inserts 20.00 Total updates 0.00 Total deletes 0.00 Total discards 0.00 Total operations 20.00 End of Statistics. GGSCI (edvmr1p0) 24>
Now, we can also check on the flume:
Check Apache Flume log
[New I/O worker #1] (org.apache.flume.source.AvroSource.appendBatch:378) - Avro source avro-source1: Received avro event batch of 1 events. [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95) - Event: {headers:{SCHEMA_NAME=OGGTRG, TABLE_NAME=CUSTOMER_PROD,SCHEMA_FINGERPRINT=1668461282719043198} body: 28 4F 47 47 54 52 47 2E 43 55 53 54 4F 4D 45 52 (OGGTRG.CUSTOMER
We can of course check the files on the HDFS as well:
Check HDFS
[oracle@edvmr1p0 oggsrc]$ hdfs dfs -ls /user/oracle/flume Found 1 items -rw-r--r-- 1 oracle supergroup 2460 2020-11-12 12:13 /user/oracle/flume/FlumeData.1605183195022 [oracle@edvmr1p0 oggsrc]$ [oracle@edvmr1p0 oggsrc]$ hdfs dfs -cat /user/oracle/flume/FlumeData.1605183195022 | head -50 { "type" : "record", "name" : "CUSTOMER_PROD", "namespace" : "OGGTRG", "fields" : [ { "name" : "table", "type" : "string" }, { "name" : "op_type", "type" : "string" }, { "name" : "op_ts", "type" : "string" }, { "name" : "current_ts", "type" : "string" }, { "name" : "pos", "type" : "string" }, { "name" : "primary_keys",
Niiice, so we have replicated 20 rows between: Oracle RDBMS → Golden Gate (Extract) → Golden Gate (Replicat) → Apache Flume → HDFS
Appendix
Flume config (/opt/flume/flume.conf)
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = seqGenSrc agent.channels = memoryChannel agent.sinks = loggerSink # For each one of the sources, the type is defined agent.sources.seqGenSrc.type = seq # The channel can be defined as follows. agent.sources.seqGenSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.loggerSink.type = logger #Specify the channel the sink should use agent.sinks.loggerSink.channel = memoryChannel # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100 # Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory # # # Define an Avro source called avro-source1 on agent1 and tell it # # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.type = avro agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 41414 # # # Define a logger sink that simply logs all events it receives # # and connect it to the other end of the same channel. agent1.sinks.log-sink1.channel = ch1 agent1.sinks.log-sink1.type = logger # # # Finally, now that we've defined all of our components, tell # # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = avro-source1 agent1.sinks.hdfs-sink.channel = ch1 agent1.sinks.hdfs-sink.type = hdfs agent1.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/user/oracle/flume agent1.sinks.hdfs-sink.hdfs.fileType = DataStream agent1.sinks = log-sink1 hdfs-sink
Log4J properties file (/opt/flume/conf/log4j.properties)
# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Define some default values that can be overridden by system properties. # # For testing, it may also be convenient to specify # -Dflume.root.logger=DEBUG,console when launching flume. #flume.root.logger=DEBUG,console flume.root.logger=TRACE,LOGFILE flume.log.dir=/home/oracle/labs/practice04/logs flume.log.file=flume.log log4j.logger.org.apache.flume.lifecycle = INFO log4j.logger.org.jboss = WARN log4j.logger.org.mortbay = INFO log4j.logger.org.apache.avro.ipc.NettyTransceiver = WARN log4j.logger.org.apache.hadoop = INFO log4j.logger.org.apache.hadoop.hive = ERROR # Define the root logger to the system property "flume.root.logger". log4j.rootLogger=${flume.root.logger} # Stock log4j rolling file appender # Default log rotation configuration log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender log4j.appender.LOGFILE.MaxFileSize=100MB log4j.appender.LOGFILE.MaxBackupIndex=10 log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file} log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout log4j.appender.LOGFILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n # Warning: If you enable the following appender it will fill up your disk if you don't have a cleanup job! # This uses the updated rolling file appender from log4j-extras that supports a reliable time-based rolling policy. # See http://logging.apache.org/log4j/companions/extras/apidocs/org/apache/log4j/rolling/TimeBasedRollingPolicy.html # Add "DAILY" to flume.root.logger above if you want to use this log4j.appender.DAILY=org.apache.log4j.rolling.RollingFileAppender log4j.appender.DAILY.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy log4j.appender.DAILY.rollingPolicy.ActiveFileName=${flume.log.dir}/${flume.log.file} log4j.appender.DAILY.rollingPolicy.FileNamePattern=${flume.log.dir}/${flume.log.file}.%d{yyyy-MM-dd} log4j.appender.DAILY.layout=org.apache.log4j.PatternLayout log4j.appender.DAILY.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n # console # Add "console" to flume.root.logger above if you want to use this log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n