自动提交Spark-SQL执行
说明
核心主程序为python脚本, 然后shell脚本中通过spark-submit命令提交主python脚本作业到集群中执行sql文件中的各条sql语句:
- python脚本: 主程序, 按照分号拆分SQL文件中的SQL语句, 并执行
- shell脚本: 执行入口, 通过spark-submit提交python到集群执行, 最后获取yarn日志
程序存放结构:
shell脚本
脚本名称: exec_spark_sql.sh
脚本参数: SQL文件绝对路径(例: /tmp/test.sql
)
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
| #!/bin/bash
sqlFile=$1
jobName=`basename $0`_`basename $1`
currDate=`date +"%Y%m%d"`
shellDir=$(cd "$(dirname $0)"; pwd)
confFile=${shellDir}/conf/principal.conf
if [ ! -f ${confFile} ];then echo "配置文件${confFile}不存在!" exit 1 fi
if [ ! -f ${sqlFile} ];then echo "SQL文件${sqlFile}不存在!" exit 1 fi
principal=$(cat ${confFile} |grep -i principal |awk -F'=' '{print $2}')
keyTab=$(cat ${confFile} |grep -i keyTab |awk -F'=' '{print $2}')
queue=$(cat ${confFile} |grep -i queue |awk -F'=' '{print $2}')
logDir="/tmp/log/exec_spark_sql/${currDate}"
logFile="${logDir}/${jobName}_`date +"%Y%m%d%H%M%S"`.log"
CheckDir() { if [ -d $1 ];then echo "目录$1存在." else echo "目录$1不存在, 开始创建:mkdir -p -m 775 $1" mkdir -p -m 775 $1 if [ $? -ne 0 ];then echo "目录创建失败!" exit 1 else echo "目录创建成功." fi fi }
Log() { createTime=`date +"%Y-%m-%d %H:%M:%S"` echo -e "[${createTime}] $*" |tee -a ${logFile} }
CheckDir "${logDir}"
shell_start_date=$(date +%s) Log "Shell_name:$0" Log "Shell_para:$@"
kinit -kt ${keyTab} ${principal}
exec_cmd="spark-submit --master yarn --deploy-mode cluster \ --driver-memory 4G --executor-memory 8G --excutor-cores 4 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerialize \ --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=214783647s \ --conf spark.dynamicAllocation.minExecutors=3 \ --conf spark.dynamicAllocation.maxExecutors=500 \ --conf spark.sql.adaptive.enabled=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.kryoserializer.buffer.max=128MB \ --conf spark.executor.memoryOverhead=4096M \ --conf spark.yarn.maxAppAttempts=1 \ --conf yarn.resourcemanager.am.max-attempts=1 \ --principal ${principal} \ --keytab ${keyTab} \ --name ${jobName} \ --files ${sqlFile} \ ${shellDir}/bin/run_spark_sql.py ${jobName} ${sqlFile} ${currDate}"
Log "程序开始执行:${shellDir}/bin/run_spark_sql.py ${jobName} ${sqlFile} ${currDate}" Log "提交命令如下:" echo "${exec_cmd}"|tee -a ${logFile}
spark_start=$(date +%s) res=$(${exec_cmd} 2>&1)
resCode=$?
spark_end=$(date +%s)
time=`expr ${spark_end} - ${spark_start}` Log "Spark job执行完成, 耗时:${time}秒"
if [ ${resCode} -ne 0 ];then Log "Spark job执行失败!" else Log "Spark job执行成功." fi
appid=$(echo "${res}" |grep -i 'proxy/application' |grep -o -E "application_[0-9]+_[0-9]+" |head -n 1) Log "appid:${appid}" if [ "X${appid}" == "X" ];then Log "applicationId获取失败, 执行信息如下:" Log "res:${res}" else Log "Spark job的applicationId是:${appid}" Log "==============================Yarn日志==============================" printFlag=0 yarn logs --applicationId ${appid} | while read line do if [[ ${line} =~ ^'LogType:stdout.ext'$ ]];then printFlag=1 fi if [[ ${line} =~ ^'End of LogType:stdout.ext'$ ]];then printFlag=0 fi if [[ ${printFlag} == 1 ]];then echo "${line}"|tee -a ${logFile} fi done Log "End of LogType:stdout.ext" fi
shell_end_date=$(date +%s)
shell_time=`expr ${shell_start_time} - ${shell_end_time}`
Log "Spark job执行耗时:${time}秒" Log "程序运行结束, 返回状态为${resCode}, 总运行耗时:${shell_time}秒" exit ${resCode}
|
python脚本
脚本名称: run_spark_sql.py
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
from pyspark.sql import SparkSession import sys,os import time import re
def main(job_name,sqlfile,job_date): sql_file_name = os.path.split(sqlfile)[1] spark = SparkSession.builder.enableHiveSupport().getOrCreate() sc = spark.sparkContext print("sql_file_name:"+sql_file_name) print("SPARK_YARN_STAGING_DIR:"+os.getenv('SPARK_YARN_STAGING_DIR')) sql_content = '\n'.join(sc.textFile(os.getenv('SPARK_YARN_STAGING_DIR')+"/"+sql_file_name).collect()) sql_statements = re.split(r';\s*\n', sql_content) sql_start_time = time.time() for index, sql in enumerate(sql_statements): sql = sql.strip() if not sql: continue start_time = time.time() print(f"开始执行SQL {index + 1}: {sql}") spark.sql(sql).show() end_time = time.time() print(f"SQL {index + 1} 耗时: {end_time - start_time:.2f} 秒") sql_end_time = time.time() print(f"SQL ALL 耗时: {sql_end_time - sql_start_time:.2f} 秒") spark.stop() if __name__ == "__main__": print("Exec run_spark_sql.py") if len(sys.argv) < 4 : print("传入参数错误, 用法: spark-submit run_spark_sql.py job_name sqlfile job_date") sys.exit(-1) job_name = sys.argv[1] sqlfile = sys.argv[2] job_date = sys.argv[3] print("程序接收到的参数为: job_name[" + job_name + "] sqlfile[" + sqlfile + "] job_date[" + job_date + "]") main(job_name, sqlfile, job_date)
|
蚂蚁🐜再小也是肉🥩!