如下所示:
import java.text.DecimalFormat import com.alibaba.fastjson.JSON import com.donews.data.AppConfig import com.typesafe.config.ConfigFactory import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory /** * Created by silentwolf on 2016/6/3. */ case class UserTag(SUUID: String, MAN: Float, WOMAN: Float, AGE10_19: Float, AGE20_29: Float, AGE30_39: Float, AGE40_49: Float, AGE50_59: Float, GAME: Float, MOVIE: Float, MUSIC: Float, ART: Float, POLITICS_NEWS: Float, FINANCIAL: Float, EDUCATION_TRAINING: Float, HEALTH_CARE: Float, TRAVEL: Float, AUTOMOBILE: Float, HOUSE_PROPERTY: Float, CLOTHING_ACCESSORIES: Float, BEAUTY: Float, IT: Float, BABY_PRODUCT: Float, FOOD_SERVICE: Float, HOME_FURNISHING: Float, SPORTS: Float, OUTDOOR_ACTIVITIES: Float, MEDICINE: Float ) object UserTagTable { val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass) val REP_HOME = s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}" def main(args: Array[String]) { var startTime = System.currentTimeMillis() val conf: com.typesafe.config.Config = ConfigFactory.load() val sc = new SparkContext() val sqlContext = new SQLContext(sc) var df1: DataFrame = null if (args.length == 0) { println("请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11") } else { var appkey = args(0) var lastdate = args(1) df1 = loadDataFrame(sqlContext, appkey, "2016-04-10", lastdate) df1.registerTempTable("suuidTable") sqlContext.udf.register("taginfo", (a: String) => userTagInfo(a)) sqlContext.udf.register("intToString", (b: Long) => intToString(b)) import sqlContext.implicits._ //***重点***:将临时表中的suuid和自定函数中Json数据,放入UserTag中。 sqlContext.sql(" select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid").map { case Row(suuid: String, taginfo: String) => val taginfoObj = JSON.parseObject(taginfo) UserTag(suuid.toString, taginfoObj.getFloat("man"), taginfoObj.getFloat("woman"), taginfoObj.getFloat("age10_19"), taginfoObj.getFloat("age20_29"), taginfoObj.getFloat("age30_39"), taginfoObj.getFloat("age40_49"), taginfoObj.getFloat("age50_59"), taginfoObj.getFloat("game"), taginfoObj.getFloat("movie"), taginfoObj.getFloat("music"), taginfoObj.getFloat("art"), taginfoObj.getFloat("politics_news"), taginfoObj.getFloat("financial"), taginfoObj.getFloat("education_training"), taginfoObj.getFloat("health_care"), taginfoObj.getFloat("travel"), taginfoObj.getFloat("automobile"), taginfoObj.getFloat("house_property"), taginfoObj.getFloat("clothing_accessories"), taginfoObj.getFloat("beauty"), taginfoObj.getFloat("IT"), taginfoObj.getFloat("baby_Product"), taginfoObj.getFloat("food_service"), taginfoObj.getFloat("home_furnishing"), taginfoObj.getFloat("sports"), taginfoObj.getFloat("outdoor_activities"), taginfoObj.getFloat("medicine") )}.toDF().registerTempTable("resultTable") val resultDF = sqlContext.sql(s"select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," + "AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," + "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," + "MEDICINE from resultTable WHERE SUUID IS NOT NULL") resultDF.write.mode(SaveMode.Overwrite).options( Map("table" -> "USER_TAGS", "zkUrl" -> conf.getString("Hbase.url")) ).format("org.apache.phoenix.spark").save() } } def intToString(suuid: Long): String = { suuid.toString() } def userTagInfo(num1: String): String = { var de = new DecimalFormat("0.00") var mannum = de.format(math.random).toFloat var man = mannum var woman = de.format(1 - mannum).toFloat var age10_19num = de.format(math.random * 0.2).toFloat var age20_29num = de.format(math.random * 0.2).toFloat var age30_39num = de.format(math.random * 0.2).toFloat var age40_49num = de.format(math.random * 0.2).toFloat var age10_19 = age10_19num var age20_29 = age20_29num var age30_39 = age30_39num var age40_49 = age40_49num var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat var game = de.format(math.random * 1).toFloat var movie = de.format(math.random * 1).toFloat var music = de.format(math.random * 1).toFloat var art = de.format(math.random * 1).toFloat var politics_news = de.format(math.random * 1).toFloat var financial = de.format(math.random * 1).toFloat var education_training = de.format(math.random * 1).toFloat var health_care = de.format(math.random * 1).toFloat var travel = de.format(math.random * 1).toFloat var automobile = de.format(math.random * 1).toFloat var house_property = de.format(math.random * 1).toFloat var clothing_accessories = de.format(math.random * 1).toFloat var beauty = de.format(math.random * 1).toFloat var IT = de.format(math.random * 1).toFloat var baby_Product = de.format(math.random * 1).toFloat var food_service = de.format(math.random * 1).toFloat var home_furnishing = de.format(math.random * 1).toFloat var sports = de.format(math.random * 1).toFloat var outdoor_activities = de.format(math.random * 1).toFloat var medicine = de.format(math.random * 1).toFloat "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," + "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," + "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," + "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," + "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," + "\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," + "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine + "}"; } def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = { val path = s"$REP_HOME/appstatistic" ctx.read.parquet(path) .filter(s"timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'") } }
以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
暂无评论...
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
2024年11月26日
2024年11月26日
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]