import org.apache.spark.ml.linalg.DenseVector
val toArr: Any => Array[Double] = _.asInstanceOf[DenseVector].toArray
val toArrUdf = udf(toArr)
val dataWithFeaturesArr = dataWithFeatures.withColumn("features_arr",toArrUdf('features))
但是如果读者不清楚自己的向量是哪个,那就尽可能去兼容一下,如下
import org.apache.spark.ml.linalg.{SparseVector, Vector}
import org.apache.spark.mllib.linalg.{Vector => OldVector}
private val vectorToArrayUdf = udf { vec: Any =>
vec match {
case v: Vector => v.toArray
case v: OldVector => v.toArray
case v => throw new IllegalArgumentException(
"function vector_to_array requires a non-null input argument and input type must be " +
"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
s"but got ${ if (v == null) "null" else v.getClass.getName }.")
}
}.asNonNullable()
def getNowDate():String={
var now:Date = new Date()
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var hehe = dateFormat.format( now )
hehe
}
2、获取昨天的日期
def getYesterday():String={
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal:Calendar=Calendar.getInstance()
cal.add(Calendar.DATE,-1)
var yesterday=dateFormat.format(cal.getTime())
yesterday
3、获取本周开始日期
def getNowWeekStart():String={
var period:String=""
var cal:Calendar =Calendar.getInstance();
var df:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
cal.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY)
//获取本周一的日期
period=df.format(cal.getTime())
period
}
4、获取本周末的时间
def getNowWeekEnd():String={
var period:String=""
var cal:Calendar =Calendar.getInstance();
var df:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
cal.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY);//这种输出的是上个星期周日的日期,因为老外把周日当成第一天
cal.add(Calendar.WEEK_OF_YEAR, 1)// 增加一个星期,才是我们中国人的本周日的日期
period=df.format(cal.getTime())
period
}
5、本月的第一天
def getNowMonthStart():String={
var period:String=""
var cal:Calendar =Calendar.getInstance();
var df:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
cal.set(Calendar.DATE, 1)
period=df.format(cal.getTime())//本月第一天
period
}
6、本月的最后一天
def getNowMonthEnd():String={
var period:String=""
var cal:Calendar =Calendar.getInstance();
var df:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
cal.set(Calendar.DATE, 1)
cal.roll(Calendar.DATE,-1)
period=df.format(cal.getTime())//本月最后一天
period
}
7、将时间戳转化成日期 时间戳是秒数,需要乘以1000l转化成毫秒
def DateFormat(time:String):String={
var sdf:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var date:String = sdf.format(new Date((time.toLong*1000l)))
date
}
8、时间戳转化为时间,原理同上
def timeFormat(time:String):String={
var sdf:SimpleDateFormat = new SimpleDateFormat("HH:mm:ss")
var date:String = sdf.format(new Date((time.toLong*1000l)))
date
}
9、计算时间差
//核心工作时间,迟到早退等的的处理
def getCoreTime(start_time:String,end_Time:String)={
var df:SimpleDateFormat=new SimpleDateFormat("HH:mm:ss")
var begin:Date=df.parse(start_time)
var end:Date = df.parse(end_Time)
var between:Long=(end.getTime()-begin.getTime())/1000//转化成秒
var hour:Float=between.toFloat/3600
var decf:DecimalFormat=new DecimalFormat("#.00")
decf.format(hour)//格式化
}
//特征名称
var features = Array("weight", "height", "age")
//字段转换成特征向量
var splitDatas = new VectorAssembler()
.setInputCols(features)
.setOutputCol("vector_features")
.transform(dataFrame.select("id", features:_*))
.randomSplit(Array(0.4, 0.3, 0.3))
//训练模型
var model:BucketedRandomProjectionLSHModel = new BucketedRandomProjectionLSH()
.setInputCol("vector_features") //待变换的特征
.setOutputCol("bkt_lsh") //变换后的特征名称
.setBucketLength(10d) //每个哈希桶的长度,更大的桶降低了假阴性率
.setNumHashTables(5) //哈希表的数量,散列表数量的增加降低了错误的否定率,如果降低它的值则会提高运行性能
.setSeed(100L) //随机种子
.fit(splitDatas.apply(0)) //训练
//通过模型转换数据
var transform = model.transform(splitDatas.apply(0))
transform.show(10, 100)
transform.printSchema()
//推荐信息,获取相关性较高的数据
var recommend= model.approxSimilarityJoin(splitDatas.apply(1), splitDatas.apply(2), 2, "distCol")
.select(
col("datasetA").getField("id").as("id"),
col("datasetB").getField("id").as("recommend_id"),
col("datasetA").getField("age").as("age"),
col("datasetB").getField("age").as("recommend_age"),
col("datasetA").getField("weight").as("weight"),
col("datasetB").getField("weight").as("recommend_weight"),
col("datasetA").getField("height").as("height"),
col("datasetB").getField("height").as("recommend_height"),
col("distCol")
)
recommend.orderBy("id", "distCol").show(100, 1000)
连续特征代码示例
// df 数据集是dataframe,并且words字段是格式是 ["我","爱","北京","天安门"]的词列表Array[String]
val word2Vec = new Word2Vec()
.setInputCol("words")
.setOutputCol("wordvec")
.setVectorSize(10)
.setMinCount(0)
val wvModel = word2Vec.fit(df)
val w2vDf = wvModel.transform(df)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(4d)
.setNumHashTables(10)
.setInputCol("wordvec")
.setOutputCol("hashes")
val brpModel = brp.fit(w2vDf)
val tsDf = brpModel.transform(w2vDf)
val key_value = List(0.13868775751827092,-0.11639275898904025,0.19808808788014898,0.2722799372859299,0.08275626220836721,-0.2846828463712129,0.2887565325463897,-0.05958885527697617,0.042977130971848965,-0.03787828763497287)
val key = Vectors.dense(key_value.toArray)
val a = brpModel.approxNearestNeighbors(tsDf, key , 3).toDF()