Table of Contents

Configure Destination

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows.

We have to configure:

Configure Flume Properties

In the target OGG for big data, create the following files

Create Flume.properties file (TRG_OGGHOME/dirprm/flume.properties)

gg.handlerlist = flume
gg.handler.flume.type=flume
gg.handler.flume.RpcClientPropertiesFile=custom-flume-rpc.properties
gg.handler.flume.format=avro_op
gg.handler.flume.mode=op
gg.handler.flume.EventMapsTo=op
gg.handler.flume.PropagateSchema=true
gg.handler.flume.includeTokens=false
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

gg.classpath=dirprm/:/opt/flume/lib/*
jvm.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

Configure Flume RPC Client

To configure the client, we have to create custom-flume-rpc.properties in the same file:

Create customer RPC File (e.g. TRG_OGGHOME/dirprm/custom-flume-rpc.properties)

client.type=default
hosts=h1
hosts.h1=localhost:41414
batch-size=100
connect-timeout=20000
request-timeout=20000

Start Flume

Start Flume

[oracle@edvmr1p0 conf]$ hdfs dfs -mkdir /user/oracle/flume
[oracle@edvmr1p0 conf]$ flume-ng agent --conf /opt/flume/conf -f /opt/flume/conf/flume.conf -Dflume.root.logger=DEBUG,LOGFILE -n agent1 -Dorg.apache.flume.log.rawdata=true
Info: Sourcing environment configuration script /opt/flume/conf/flume-env.sh
Info: Including Hadoop libraries found via (/opt/hadoop/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/opt/hbase/bin/hbase) for HBASE access
+ exec /usr/java/latest/bin/java -Xms100m -Xmx2000m -Dcom.sun.management.jmxremote -Dflume.root.logger=DEBUG,LOGFILE -Dorg.apache.flume.log.rawdata=true -cp '/opt/flume/conf:/opt/flume/lib/*:/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/contrib/capacity-scheduler/*.jar:/opt/hbase/conf:/usr/java/latest/lib/tools.jar:/opt/hbase:/opt/hbase/lib/activation-1.1.jar:/opt/hbase/lib/aopalliance-1.0.jar:/opt/hbase/lib/apacheds-i18n-2.0.0-M15.jar:/opt/hbase/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/opt/hbase/lib/api-asn1-api-1.0.0-M2

Configure the GG Replicat

Again, we have to configure the GG with the replicat:

Configure GoldenGate Replicat

[oracle@edvmr1p0 dirprm]$ trg
[oracle@edvmr1p0 oggtrg]$ ggsci

Oracle GoldenGate Command Interpreter
Version 12.2.0.1.160823 OGGCORE_OGGADP.12.2.0.1.0_PLATFORMS_161019.1437
Linux, x64, 64bit (optimized), Generic on Oct 19 2016 16:01:40
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2016, Oracle and/or its affiliates. All rights reserved.



GGSCI (edvmr1p0) 1> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           


GGSCI (edvmr1p0) 2> edit param rflume

REPLICAT rflume
TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.properties
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP OGGSRC.*, TARGET OGGTRG.*;
:wq

GGSCI (edvmr1p0) 3> add replicat rflume, exttrail ./dirdat/fl
REPLICAT added.


GGSCI (edvmr1p0) 4> start rflume

Sending START request to MANAGER ...
REPLICAT RFLUME starting


GGSCI (edvmr1p0) 5> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
REPLICAT    RUNNING     RFLUME      00:00:00      00:00:07    

Test

To test replication, we will simply insert some records and see if we can see them replicated in the log

Insert rows

[oracle@edvmr1p0 oggsrc]$ sqlplus oggsrc/oracle@orcl

SQL*Plus: Release 12.1.0.2.0 Production on Thu Nov 12 12:12:27 2020

Copyright (c) 1982, 2014, Oracle.  All rights reserved.

Last Successful login time: Thu Nov 12 2020 11:40:44 +00:00

Connected to:
Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production
With the Partitioning, OLAP, Advanced Analytics and Real Application Testing options

SQL> insert into customer_prod select * from customer where customer_id < 21;

20 rows created.

SQL> commit;

Commit complete.

SQL> 

Then, we can check the stats from the GoldenGate

Check GoldenGate Stats

--Source (Extract)
GGSCI (edvmr1p0) 5> send priex, stats

Sending STATS request to EXTRACT PRIEX ...

Start of Statistics at 2020-11-12 12:13:16.

DDL replication statistics (for all trails):

*** Total statistics since extract started     ***
	Operations                		           6.00

Output to ./dirdat/in:

Extracting from OGGSRC.CUSTOMER_PROD to OGGSRC.CUSTOMER_PROD:

*** Total statistics since 2020-11-12 12:13:12 ***
	Total inserts                   	          20.00
	Total updates                   	           0.00
	Total deletes                   	           0.00
	Total discards                  	           0.00
	Total operations                	          20.00

*** Daily statistics since 2020-11-12 12:13:12 ***
	Total inserts                   	          20.00
	Total updates                   	           0.00
	Total deletes                   	           0.00
	Total discards                  	           0.00
	Total operations                	          20.00

*** Hourly statistics since 2020-11-12 12:13:12 ***
	Total inserts                   	          20.00
	Total updates                   	           0.00
	Total deletes                   	           0.00
	Total discards                  	           0.00
	Total operations                	          20.00

*** Latest statistics since 2020-11-12 12:13:12 ***
	Total inserts                   	          20.00
	Total updates                   	           0.00
	Total deletes                   	           0.00
	Total discards                  	           0.00
	Total operations                	          20.00

End of Statistics.


GGSCI (edvmr1p0) 6> 


--Target (Replicat)
GGSCI (edvmr1p0) 23> send rflume, stats

Sending STATS request to REPLICAT RFLUME ...

Start of Statistics at 2020-11-12 12:13:26.

Replicating from OGGSRC.CUSTOMER_PROD to OGGTRG.CUSTOMER_PROD:

*** Total statistics since 2020-11-12 12:13:14 ***
	Total inserts                   	          20.00
	Total updates                   	           0.00
	Total deletes                   	           0.00
	Total discards                  	           0.00
	Total operations                	          20.00

*** Daily statistics since 2020-11-12 12:13:14 ***
	Total inserts                   	          20.00
	Total updates                   	           0.00
	Total deletes                   	           0.00
	Total discards                  	           0.00
	Total operations                	          20.00

*** Hourly statistics since 2020-11-12 12:13:14 ***
	Total inserts                   	          20.00
	Total updates                   	           0.00
	Total deletes                   	           0.00
	Total discards                  	           0.00
	Total operations                	          20.00

*** Latest statistics since 2020-11-12 12:13:14 ***
	Total inserts                   	          20.00
	Total updates                   	           0.00
	Total deletes                   	           0.00
	Total discards                  	           0.00
	Total operations                	          20.00

End of Statistics.


GGSCI (edvmr1p0) 24> 

Now, we can also check on the flume:

Check Apache Flume log

[New I/O worker #1]
(org.apache.flume.source.AvroSource.appendBatch:378) - Avro source avro-source1: Received avro event batch of 1 events. [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95) - Event: {headers:{SCHEMA_NAME=OGGTRG, TABLE_NAME=CUSTOMER_PROD,SCHEMA_FINGERPRINT=1668461282719043198} body: 28 4F 47 47 54 52 47 2E 43 55 53 54 4F 4D 45 52 (OGGTRG.CUSTOMER 

We can of course check the files on the HDFS as well:

Check HDFS

[oracle@edvmr1p0 oggsrc]$ hdfs dfs -ls /user/oracle/flume
Found 1 items
-rw-r--r--   1 oracle supergroup       2460 2020-11-12 12:13 /user/oracle/flume/FlumeData.1605183195022
[oracle@edvmr1p0 oggsrc]$ 
[oracle@edvmr1p0 oggsrc]$ hdfs dfs -cat /user/oracle/flume/FlumeData.1605183195022 | head -50
{
  "type" : "record",
  "name" : "CUSTOMER_PROD",
  "namespace" : "OGGTRG",
  "fields" : [ {
    "name" : "table",
    "type" : "string"
  }, {
    "name" : "op_type",
    "type" : "string"
  }, {
    "name" : "op_ts",
    "type" : "string"
  }, {
    "name" : "current_ts",
    "type" : "string"
  }, {
    "name" : "pos",
    "type" : "string"
  }, {
    "name" : "primary_keys",

Niiice, so we have replicated 20 rows between: Oracle RDBMS → Golden Gate (Extract) → Golden Gate (Replicat) → Apache Flume → HDFS

Appendix

Flume config (/opt/flume/flume.conf)

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100


# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
#  
#  # Define an Avro source called avro-source1 on agent1 and tell it
#  # to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414
#   
#   # Define a logger sink that simply logs all events it receives
#   # and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger
#    
#    # Finally, now that we've defined all of our components, tell
#    # agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks.hdfs-sink.channel = ch1
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/user/oracle/flume
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks = log-sink1 hdfs-sink

Log4J properties file (/opt/flume/conf/log4j.properties)

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Define some default values that can be overridden by system properties.
#
# For testing, it may also be convenient to specify
# -Dflume.root.logger=DEBUG,console when launching flume.

#flume.root.logger=DEBUG,console
flume.root.logger=TRACE,LOGFILE
flume.log.dir=/home/oracle/labs/practice04/logs
flume.log.file=flume.log

log4j.logger.org.apache.flume.lifecycle = INFO
log4j.logger.org.jboss = WARN
log4j.logger.org.mortbay = INFO
log4j.logger.org.apache.avro.ipc.NettyTransceiver = WARN
log4j.logger.org.apache.hadoop = INFO
log4j.logger.org.apache.hadoop.hive = ERROR

# Define the root logger to the system property "flume.root.logger".
log4j.rootLogger=${flume.root.logger}


# Stock log4j rolling file appender
# Default log rotation configuration
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.LOGFILE.MaxFileSize=100MB
log4j.appender.LOGFILE.MaxBackupIndex=10
log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file}
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.LOGFILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n


# Warning: If you enable the following appender it will fill up your disk if you don't have a cleanup job!
# This uses the updated rolling file appender from log4j-extras that supports a reliable time-based rolling policy.
# See http://logging.apache.org/log4j/companions/extras/apidocs/org/apache/log4j/rolling/TimeBasedRollingPolicy.html
# Add "DAILY" to flume.root.logger above if you want to use this
log4j.appender.DAILY=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.DAILY.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.DAILY.rollingPolicy.ActiveFileName=${flume.log.dir}/${flume.log.file}
log4j.appender.DAILY.rollingPolicy.FileNamePattern=${flume.log.dir}/${flume.log.file}.%d{yyyy-MM-dd}
log4j.appender.DAILY.layout=org.apache.log4j.PatternLayout
log4j.appender.DAILY.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n


# console
# Add "console" to flume.root.logger above if you want to use this
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n