Tuesday 10 September 2019

DL_PARQUET_WRAPPER_SRIPT

#====================================================================================================================================================
# Title            : CDL_PARQUET_Wrapper
# ProjectName      : CDL BaseLine Loads
# Filename         : CDL_PARQUET_Wrapper.sh
# Description      : Script moves data from Avro Hive external table into Parquet Hive external table
# Developer        :xxxxxx
# Created on       : AUG 2017
# Location         : xxxxxx
# Logic            :
# Parameters       : Parameter file name
# Return codes     :
# Date                         Ver#     Modified By(Name)                 Change and Reason for Change
# ----------    -----        -----------------------------               --------------------------------------
# 2017/07/13                      1     Initial Version

#  ***************************************************************************************************************************
######### Below scrtip performs the following
######### 1. Execute the Reconciliation -1 : Compare source record counts to the avro table count. If the count matches , move the data to parquet table.
#########     if the counts do not match, abort the job .
######### 2. Insert data into Parquet table
######### 3. Write LoadstartDate And LoadEndDate in param file

echo "Execution of  parquet  Schema  Creation"
#Source DBSRVR config file
#Taking user input and passing to DBSRVR file
db_conf=$2
echo "DB Conf value"
echo $db_conf
echo $0
BIN_DIR=`dirname $0`
echo $BIN_DIR
echo "Sourcing .env file"
source $BIN_DIR/cdl-bl-db-env.conf $db_conf
#Source Hadoop config file
source $BIN_DIR/cdl-bl-env.conf

#Source the parameter file
echo $1
source $1

echo  $CDL_CONF
echo  $CDL_PARAM
echo  $CDL_LOGS


#Source bash_profile to get credentials for Teradata BTEQ Script
#source ~/.bash_profile

SummaryLogFileNm=CDL_AVRO_PARQUET_`date +%Y%m%d%H%M%S`_${TBL_NM}_$USER.log
SLogFullFileName=$CDL_LOGS/$SummaryLogFileNm
exec 1> $SLogFullFileName 2>&1


#Creating Parquet Table before inserting data.

 #sh CDL_PARQUET_TABLE_CREATION_WRAPPER.sh

### Variable Declarations
error_count=0
YYYY=`date +%Y`
MM=`date +%m`
RunDate=`date +%Y:%m:%d:%H:%M:%S`


fnLogMsg()
{
     LogTime=`date +%Y%m%d%H%M%S`
     LogMsg="$LogTime:$1:$2"
     echo $LogMsg >> $SLogFullFileName
     echo $LogMsg
}

fnStatusCheck()
{
     CommandRC="$1"
     SuccessMSG="$2"
     FailureMSG="$3"
             
    if [ $CommandRC -eq 0 ]
    then
        fnLogMsg INFO "${SuccessMSG}"
        else
            fnLogMsg INFO "${FailureMSG}"
        exit 1
        error_count =`expr $error_count + 1`
    fi
}



#Source bash_profile to get credentials for Teradata BTEQ Script

#source ./.bash_profile

#echo "Logon server name"

#echo $LOGON

#Check  if file is load_log file.If yes skip the recon process



#########Reconciliation -1 : Compare source record counts to the avro table count. If the count matches , move the data to parquet table.
######### if the counts do not match, abort the job .

#Create directory if not exist
dir=$CDL_LOGS/Reconciliation
if [[ ! -e $dir ]]; then
    mkdir $dir
elif [[  -d $dir ]]; then
    echo "$dir already exist"
fi

#Remove Temp Files if exist
AvroReconFile=$CDL_LOGS/Reconciliation/avro_count_recon_${TBL_NM}_$RUN_NB.txt
if [ -f "$AvroReconFile" ]
then
    rm $AvroReconFile

   echo "AvroReconFile Temp File deleted"
fi

#Remove Temp Files if exist
TeradataReconFile=$CDL_LOGS/Reconciliation/Teradata_count_recon_${TBL_NM}_$RUN_NB.txt
if [ -f "$TeradataReconFile" ]
then
    rm $TeradataReconFile

   echo "TeradataReconFile Temp File deleted"
fi


#GET Avro Table Count

#impala-shell -k --ssl -i $IMPALA_CMD -B  -q  "REFRESH ${cdl_schema_name_sg}.${cdl_incoming_table_name};SELECT count(*) from  ${cdl_schema_name_sg}.${cdl_incoming_table_name};" --output_file=$CDL_LOGS/Reconciliation/avro_count_recon_${TBL_NM}_$RUN_NB.txt

beeline -u $BEELINE_CMD  --outputformat=tsv2 --showHeader=false -e "SELECT count(*) from  ${cdl_schema_name_sg}.${cdl_incoming_table_name};">>$CDL_LOGS/Reconciliation/avro_count_recon_${TBL_NM}_$RUN_NB.txt

if [ $? -gt 0 ]; then
echo "Error in getting count from Avro external table"
exit 1
fi


cat $CDL_LOGS/CDL_BTEQ_RECON_${TBL_NM}_$USER.log|grep 'Count' -A 2|tail -1 >>$CDL_LOGS/Reconciliation/Teradata_count_recon_${TBL_NM}_$RUN_NB.txt



#Getting the counts for avro and teradata
avroRecon_count=`cat $CDL_LOGS/Reconciliation/avro_count_recon_${TBL_NM}_$RUN_NB.txt`
TeradataRecon_count=`cat $CDL_LOGS/Reconciliation/Teradata_count_recon_${TBL_NM}_$RUN_NB.txt`


echo "avro file count =" $avroRecon_count
echo "Teradata source count =" $TeradataRecon_count
######Compare Teradata  count with Avro count. If the counts match , then proceed with next  steps. Else exit the code.

check_load_log_file=` echo $1 | grep 'load_log'`
if [  -z "$check_load_log_file" ]; then
echo "File is not load_log file.Teradata Recon required"

if [ $avroRecon_count -eq $TeradataRecon_count ]
then

echo "Teradata Avro Counts are matching"

else

if [ "$STAGE_TABLE_NEEDED" == "YES" ]
then
echo "Teradata Recon count and avro count are not matched"
exit 1
else
echo "Teradata Recon count and avro count are not matched"
echo "Teradata Recon count and avro count are not matched. But, Process continues to load the Data in to Hive" |mail -s "Teradata Recon count and avro count are not matched for ${TBL_NM}" dl-cdl_fndtn_lyr@anthem.com
#exit 1
fi
fi
else
echo "load_log file.Skipping Recon process"
echo $check_load_log_file

fi
#Export avro table count for audit wrapper
avroRecon_cnt=`echo ${avroRecon_count//[[:blank:]]/}`
echo export avroRecon_count=$avroRecon_cnt >>$1



################################### STEP -2 : Move the data from Avro table into Parquet table
echo "STEP -1 : Move the data from Avro table into Parquet table"

### Execution of script starts here
echo "Values required for Parquet insert"
echo $cdl_incoming_table_name
echo $cdl_table_name
echo $cdl_schema_name_wh
echo $is_partition


#Table name for hive query
        tbl_nm_hive="'$cdl_table_name'"
        echo "hive query table name"
        echo $tbl_nm_hive

############ Get the updated Load Log Key-CDH_LOAD_LOG



#Deleting Temp File

CDLFile=$CDL_LOGS/cdl_count_$TBL_NM.txt
if [ -f "$CDLFile" ]
then
     rm $CDLFile

   echo "CDLFile Temp File deleted"
fi


# Table name in CDH_LOD_LOG query to get max(CDH_LOAD_LOG) value
teradata_table_name=$TBL_NM
teradata_table_name_for_hive="'$teradata_table_name'"
echo "Hive query table name"
echo $teradata_table_name_for_hive
process_nm_param=$PROCESS_NM
process_nm_hive="'$process_nm_param'"
echo "Process Name"
echo $process_nm_hive
subj_area_param=$SUBJ_AREA_NM
subj_area_hive="'$subj_area_param'"
echo "subj_area_hive Name"
echo $subj_area_hive
#connectimg beeline to get the max $CDH_LOAD_LOG_KEY
echo "Schema name"
echo $cdl_schema_name_sg
echo  beeline -u $BEELINE_CMD --outputformat=tsv2 --showHeader=false -e "set hive.exec.max.dynamic.partitions.pernode=500;set hive.auto.convert.join=false;set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; select max(CDH_LOAD_LOG_KEY) FROM $cdl_schema_name_sg.CDH_LOAD_LOG where SUB_PROCESS_NM=$teradata_table_name_for_hive and SUBJ_AREA_NM=$subj_area_hive and PROCESS_NM=$process_nm_hive limit 1;"



#beeline -u $BEELINE_CMD --outputformat=tsv2 --showHeader=false -e "set hive.exec.max.dynamic.partitions.pernode=500;set hive.auto.convert.join=false;set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; select max(CDH_LOAD_LOG_KEY) FROM $cdl_schema_name_sg.CDH_LOAD_LOG where SUB_PROCESS_NM=$teradata_table_name_for_hive and SUBJ_AREA_NM=$subj_area_hive and PROCESS_NM=$process_nm_hive limit 1;" > $CDL_LOGS/cdl_count_$TBL_NM.txt


#Check load_log_key-If NULL then set to 1 otherwise load_log_key+1
#load_log_key=`cat $CDL_LOGS/cdl_count_$TBL_NM.txt`
#echo "beeline result"
#echo $load_log_key
#First entry of table in Audit table
#if [ "$load_log_key" == "NULL" ]; then
#CDH_LOAD_LOG_KEY=1
#echo $CDH_LOAD_LOG_KEY
#  echo "load_log_key is null -initializing to 1"
#else
#CDH_LOAD_LOG_KEY=`expr $load_log_key + 1`
CDH_LOAD_LOG_KEY=`java -jar $CDL_BIN/Unique_number/UniqueNumber.jar`
#fi

#Checking Table Partition
#LoadStartDate=`date +%Y-%m-%d %H:%M:%S`
#Exporting For Audit table
LoadStartDate=`date +%Y:%m:%d:%H:%M:%S`
echo "LoadStartDate******************"
echo $LoadStartDate

echo "Writing variables in file"
echo $1
#echo export CDH_LOAD_LOG_KEY=$CDH_LOAD_LOG_KEY >>$1
echo export CDH_LOAD_LOG_KEY=$CDH_LOAD_LOG_KEY >>$1
echo export LoadStartDate=$LoadStartDate >>$1




set hive.execution.engine=mapreduce

echo CDH_LOAD_LOG_KEY : $CDH_LOAD_LOG_KEY
echo RUN_NB: $RUN_NB

#Check the RUN_NB is param file and if RUN_NB=1 then delete the old data parquet table data
if [ $RUN_NB -eq 1 ]
then
    # hadoop fs -rmr -skipTrash $PARQUET_LOCATION
hadoop fs -rmr -skipTrash $PARQUET_LOCATION/*
   echo "Parquet old files deleted"
   #Initialize Table Count To 0
   PQT_ROWS_LOADED_NB=0
   echo "Count of Parquet Table "
   echo $PQT_ROWS_LOADED_NB
fi

#Create directory if not exist
audit_dir=$CDL_TEMP_AUDIT/Audit
if [[ ! -e $audit_dir ]]; then
    mkdir $audit_dir
elif [[  -d $audit_dir ]]; then
    echo "$audit_dir already exist"
fi

ParquetFile=$audit_dir/parquet_count_${TBL_NM}_$USER.txt

if [ -f "$ParquetFile" ]
then
    rm $ParquetFile

   echo "Parquet Temp File deleted"
fi

#Get the parquet table name
parquet_table_name=$cdl_table_name

#Check parquet count before inserting data to avoid duplication
if [ $RUN_NB != 1 ]

then

echo "RUN_NB is not 1.Check Parquet Table Count"
#Get the parquet table count
#echo beeline -u $BEELINE_CMD  --outputformat=tsv2 --showHeader=false -e "SELECT count(*) from  ${cdl_schema_name_wh}.${parquet_table_name} where ${PARQUET_RECON_CON};" >> $audit_dir/parquet_count_$TBL_NM_$USER.txt

beeline -u $BEELINE_CMD  --outputformat=tsv2 --showHeader=false -e "SELECT count(*) from  ${cdl_schema_name_wh}.${parquet_table_name} where ${PARQUET_RECON_CON};" >> $audit_dir/parquet_count_${TBL_NM}_$USER.txt

#impala-shell -k --ssl -i $IMPALA_CMD -B  -q  "invalidate metadata ${cdl_schema_name_wh}.${parquet_table_name} ;SELECT count(*) from  ${cdl_schema_name_wh}.${parquet_table_name} where ${PARQUET_RECON_CON};" --output_file=$audit_dir/parquet_count_$TBL_NM_$USER.txt

#Check Script successfully executed or not
if [ $? -eq 0 ]; then
    echo "Succefully get count from  Parquet external table "
else

echo "Beeline command to get count from  Parquet external table failed"
exit 1
fi



#Getting the counts for avro and parquet
echo "Paquet Count from Param file"
parquet_count=`cat $audit_dir/parquet_count_${TBL_NM}_$USER.txt`

echo $parquet_count

#Parquet table count
PQT_ROWS_LOADED_NB=`echo ${parquet_count//[[:blank:]]/}`

echo $PQT_ROWS_LOADED_NB

#End of RUN_NB if condition
fi


#Compare the counts before deleting the temp files and tables
if [ "$PQT_ROWS_LOADED_NB" == "0" ]
then


#SPARK2.2

echo spark2-submit --verbose --class com.anthem.cdl.preprocess.CDL_DDL_Insertion --master yarn --deploy-mode cluster --queue ${QUEUE_NAME} --driver-memory 8G --executor-memory 30G --executor-cores 4 --num-executors 40 --conf spark.driver.maxResultSize=3g --conf spark.yarn.executor.memoryOverhead=8192 --conf spark.yarn.driver.memoryOverhead=8192 --conf spark.network.timeout=600 --conf hive.execution.engine=spark --conf spark.port.maxRetries=20 --conf spark.sql.broadcastTimeout=4800 --conf spark.executor.heartbeatInterval=30s --conf spark.dynamicAllocation.initialExecutors=10 --conf spark.dynamicAllocation.minExecutors=10 --conf spark.dynamicAllocation.maxExecutors=40 --files /etc/spark2/conf.cloudera.spark2_on_yarn/yarn-conf/hive-site.xml --jars /opt/cloudera/parcels/CDH-5.12.2-1.cdh5.12.2.p0.4/jars/hive-contrib-1.1.0-cdh5.12.2.jar,/usr/lib/tdch/1.5/lib/terajdbc4.jar,/usr/lib/tdch/1.5/lib/tdgssconfig.jar --name $TBL_NM $BIN_DIR/$SPARK_JAR $PARQUET_TEMP_LOCATION/final_parquet_data_$cdl_table_name.txt $cdl_schema_name_wh $cdl_schema_name_sg $cdl_table_name $cdl_incoming_table_name $PARQUET_LOCATION $CDH_LOAD_LOG_KEY $PARTITION_COLUMN $PARTITION_COLUMN_1

echo insert to parquet from avro started

export JAVA_HOME=/usr/java/latest

spark2-submit --verbose --class com.anthem.cdl.preprocess.CDL_DDL_Insertion --master yarn --deploy-mode cluster --queue ${QUEUE_NAME} --driver-memory 8G --executor-memory 30G --executor-cores 4 --num-executors 40 --conf spark.driver.maxResultSize=3g --conf spark.yarn.executor.memoryOverhead=8192 --conf spark.yarn.driver.memoryOverhead=8192 --conf spark.network.timeout=600 --conf hive.execution.engine=spark --conf spark.port.maxRetries=20 --conf spark.sql.broadcastTimeout=4800 --conf spark.executor.heartbeatInterval=30s --conf spark.dynamicAllocation.initialExecutors=10 --conf spark.dynamicAllocation.minExecutors=10 --conf spark.dynamicAllocation.maxExecutors=40 --files /etc/spark2/conf.cloudera.spark2_on_yarn/yarn-conf/hive-site.xml --jars /opt/cloudera/parcels/CDH-5.12.2-1.cdh5.12.2.p0.4/jars/hive-contrib-1.1.0-cdh5.12.2.jar,/usr/lib/tdch/1.5/lib/terajdbc4.jar,/usr/lib/tdch/1.5/lib/tdgssconfig.jar --name $TBL_NM $BIN_DIR/$SPARK_JAR $PARQUET_TEMP_LOCATION/final_parquet_data_$cdl_table_name.txt $cdl_schema_name_wh $cdl_schema_name_sg $cdl_table_name $cdl_incoming_table_name $PARQUET_LOCATION $CDH_LOAD_LOG_KEY $PARTITION_COLUMN $PARTITION_COLUMN_1


#Get spark application URL
 application_url=`grep tracking  $CDL_LOGS/$SummaryLogFileNm|head -1`

 #Extract application_id from URL
application_id=$(echo $application_url | sed 's:/*$::')

#Get application name
application_name=`echo $application_id|rev | cut -d'/' -f 1 | rev`

#Remove temp files
rm $CDL_LOGS/temp_app_details_${TBL_NM}_$USER.txt

#Get application status details and save in temp file
yarn application --status $application_name >$CDL_LOGS/temp_app_details_${TBL_NM}_$USER.txt

#Get the application final status
app_status=`grep Final-State $CDL_LOGS/temp_app_details_${TBL_NM}_$USER.txt`
final_app_status=`echo $app_status|rev | cut -d':' -f 1 | rev|tail -1`

#echo $final_app_status
status="SUCCEEDED"

echo $final_app_status
echo $status
#Compare application status
if [ $final_app_status  ==  $status ]
then
echo "Parquet data inserted successfully using spark code"
else
echo "Spark Job Failed.Please check the log"
exit 1
fi

#Refresh table
beeline -u $BEELINE_CMD  --outputformat=tsv2 --showHeader=false -e "msck repair table ${cdl_schema_name_wh}.${parquet_table_name}";
#Check Script successfully executed or not
if [ $? -eq 0 ]; then
    echo "Succefully refreshed the table ${cdl_schema_name_wh}.${parquet_table_name} "
else

echo "Beeline command to refresh ${cdl_schema_name_wh}.${parquet_table_name} table failed"
exit 1
fi


#Checking script execution success/failure
insrtRC=$?
fnStatusCheck $insrtRC "echo insert to parquet from avro completed .... $cdl_schema_name_wh is Loaded"  "Error Loading  loading $cdl_table_name"

fnLogMsg INFO "CDL load process is completed for table : $cdl_table_names"



#Load  start end Timestamp
rm $CDL_LOGS/ParquetRun.txt
LoadEndDate=`date +%Y:%m:%d:%H:%M:%S`
echo $LoadEndDate >>$CDL_LOGS/ParquetRun.txt
echo "Avro Parquet Insert Job Completed"
echo $LoadEndDate


echo "Writing into param file for Audit log"
echo $1
echo export CDH_LOAD_LOG_KEY=$CDH_LOAD_LOG_KEY >>$1
echo export LoadStartDate=$LoadStartDate >>$1
echo export LoadEndDate=$LoadEndDate >>$1

else

echo "ParquetTable:Data Already inserted.Please Check Parquet Table Data"

fi



1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...