在本文中,我们将为您详细介绍在scipy.cluster.hierarchy.linkage的相关知识,并且为您解答关于中使用距离矩阵?的疑问,此外,我们还会提供一些关于akkacluster集群-a
在本文中,我们将为您详细介绍在scipy.cluster.hierarchy.linkage的相关知识,并且为您解答关于中使用距离矩阵?的疑问,此外,我们还会提供一些关于akka cluster集群-akka cluster bootstrap-Local using config、Akka-Cluster(3)- ClusterClient, 集群客户端、Akka源码分析-Cluster-ClusterClient、Android 工具 HierarchyViewer 代码导读 (2) -- 建立 Eclipse 调试环境的有用信息。
本文目录一览:- 在scipy.cluster.hierarchy.linkage()中使用距离矩阵?(python距离矩阵)
- akka cluster集群-akka cluster bootstrap-Local using config
- Akka-Cluster(3)- ClusterClient, 集群客户端
- Akka源码分析-Cluster-ClusterClient
- Android 工具 HierarchyViewer 代码导读 (2) -- 建立 Eclipse 调试环境
在scipy.cluster.hierarchy.linkage()中使用距离矩阵?(python距离矩阵)
我有一个距离矩阵N * N M
,其中M_ij
是之间的距离object_i
和object_j
。因此,正如预期的那样,它采用以下形式:
/ 0 M_01 M_02 ... M_0n\ | M_10 0 M_12 ... M_1n | | M_20 M_21 0 ... M2_n | | ... | \ M_n0 M_n2 M_n2 ... 0 /
现在,我希望通过分层聚类将这n个对象聚类。Python有一个称为的实现scipy.cluster.hierarchy.linkage(y,method=''single'', metric=''euclidean'')
。
它的文档说:
y必须是{n \ choose 2}大小的向量,其中n是距离矩阵中成对的原始观测数。
y:ndarray
压缩或冗余距离矩阵。压缩距离矩阵是包含距离矩阵上三角的平面阵列。这是pdist返回的形式。替代地,可以将n个维度上的m个观测向量的集合作为m×n阵列来传递。
我对的这种描述感到困惑y
。 我可以直接M
输入我的输入y
吗?
更新资料
@ hongbo-zhu-
cn在GitHub上提出了这个问题。这正是我所关心的。但是,作为GitHub的新手,我不知道它是如何工作的,因此不知道如何解决此问题。
答案1
小编典典似乎确实不能直接传递冗余方阵,尽管文档声称可以这样做。
为了使将来遇到相同问题的任何人受益,我在此处写我的解决方案作为附加答案。因此,复制粘贴人员可以继续进行聚类。
使用以下代码段压缩矩阵并愉快地进行。
import scipy.spatial.distance as ssd# convert the redundant n*n square matrix form into a condensed nC2 array distArray = ssd.squareform(distMatrix) # distArray[{n choose 2}-{n-i choose 2} + (j-i-1)] is the distance between points i and j
如果我错了,请纠正我。
akka cluster集群-akka cluster bootstrap-Local using config
application.conf
akka {
loglevel = INFO
actor {
provider = "cluster"
}
remote {
artery {
enabled = on
transport = tcp
canonical.port = 2551
}
}
}
#coorindated-shutdown
akka.cluster.shutdown-after-unsuccessful-join-seed-nodes = 30s
akka.coordinated-shutdown.exit-jvm = on
#coorindated-shutdown
#discovery
akka.discovery {
config.services = {
local-cluster = {
endpoints = [
{
host = "127.0.0.1"
port = 8558
},
{
host = "127.0.0.2"
port = 8558
},
{
host = "127.0.0.3"
port = 8558
}
]
}
}
}
#discovery
#health
akka.management.health-checks {
readiness-path = "health/ready"
liveness-path = "health/alive"
}
#health
akka.management.http {
route-providers-read-only = false
}
#bootstrap
akka.management {
cluster.bootstrap {
contact-point-discovery {
service-name = "local-cluster"
discovery-method = config
required-contact-point-nr =1
contact-with-all-contact-points = false
}
}
}
#bootstrap
java代码片段
package com.muyu.akka.demo;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import akka.management.scaladsl.AkkaManagement;
public class Demo01 {
public static void main(String[] args) {
StringBuilder sb =new StringBuilder();
sb.append("akka.remote.artery.canonical.hostname=127.0.0.1\n");
sb.append("akka.management.http.hostname=127.0.0.1\n");
//Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2011\nakka.remote.artery.canonical.port=2011").withFallback(ConfigFactory.load());
Config config = ConfigFactory.parseString(sb.toString()).withFallback(ConfigFactory.load());
//Config config = ConfigFactory.load();
ActorSystem system = ActorSystem.create("local-cluster", config);
AkkaManagement.get(system).start();
ClusterBootstrap.get(system).start();
Cluster cluster = Cluster.get(system);
cluster.registerOnMemberUp(new Runnable() {
@Override
public void run() {
System.out.println("-->");
}
});
// Create an
//ActorRef actorRef = system.actorOf(Props.create(SimpleClusterListener.class), "ClusterListener");
//System.out.println(actorRef.toString());
}
}
Akka-Cluster(3)- ClusterClient, 集群客户端
上篇我们介绍了distributed pub/sub消息传递机制。这是在同一个集群内的消息共享机制:发布者(publisher)和订阅者(subscriber)都在同一个集群的节点上,所有节点上的DistributedPubSubMediator通过集群内部的沟通机制在底层构建了消息流通渠道。在actor pub/sub层面可以实现对象位置透明化。在现实里很多前端都会作为某个集群的客户端但又与集群分离,又或者两个独立的集群之间可能会发生交互关系,这是也会出现客户端与服务端不在同一集群内的情况,ClusterClient就是为集群外部actor与集群内部actor进行沟通的解决方案。
实际上ClusterClient模式就代表一种依赖于消息发布订阅机制的服务方式:客户端通过消息来请求服务,服务端接收请求服务消息并提供相应运算服务。
我们可以把集群客户端模式分成集群客户端ClusterClient和集群服务端ClusterClientReceptionist,从字面理解这就是个接待员这么个角色,负责接待集群外客户端发起的服务请求。在集群所有节点上(或者选定角色role)都部署ClusterClientReceptionist,它们都与本节点的DistributedPubSubMediator对接组成更上一层的消息订阅方,ClusterClient与ClusterClientReceptionist的对接又组成了一种统一集群环境可以实现上集所讨论的distributed pub/sub机制。
ClusterClient就是消息发布方,它是在目标集群之外机器上的某个actor。这个机器上的actor如果需要向集群内部actor发送消息可以通过这个机器上的ClusterClient actor与集群内的ClusterClientReceptionist搭建的通道向集群内某个ClusterClientReceptionist连接的DistributedPubSubMediator所登记的actor进行消息发送。所以使用集群客户端的机器必须在本机启动ClusterClient服务(运行这个actor),这是通讯桥梁的一端。
ClusterClient在启动时用预先配置的地址(contact points)与ClusterClientReceptionist连接,然后通过ClusterClientReceptionist发布的联络点清单来维护内部的对接点清单,可以进行持久化,在发生系统重启时用这个名单来与集群连接。一旦连接,ClusterClient会监控对方运行情况,自动进行具体ClusterClientReceiptionist的替换。ClusterClient发布消息是包嵌在三种结构里的:
1、ClusterClient.Send
2、ClusterClient.SendAll
3、ClusterClient.Publish
这几种方法我们在上篇已经讨论过,这里就略去。
ClusterClientReceiptionist是集群内的消息接收接口。集群内需要接收消息的actor必须在本地的DistributedPubSubMediator上注册自己的地址,ClusterClientReceptionist由此获得集群内所有服务项目actor的地址清单。通过ClusterClient发布的消息内指定接收方类型信息来确定最终接收消息并提供服务的actor。服务注册示范如下:
//注册服务A
val serviceA = system.actorOf(Props[Service], "serviceA")
ClusterClientReceptionist(system).registerService(serviceA)
//注册服务B
val serviceB = system.actorOf(Props[Service], "serviceB")
ClusterClientReceptionist(system).registerService(serviceB)
ClusterClient调用服务示范:
val client = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")
client ! ClusterClient.Send("/user/serviceA", DoThis, localAffinity = true)
client ! ClusterClient.SendToAll("/user/serviceB", DoThat)
注意:ClusterClientReceptionist需要接收DoThis,DoThat消息并实现相关的运算。
在具体应用中要注意sender()的具体意义:从提供服务的actor方面看,sender()代表ClusterClientReceptionist。从发布消息的actor角度看,sender()代表的是DeadLetter。如果服务actor需要知道请求者具体地址,发布方可以把自己的地址嵌在发布的消息结构里。
下面我们就通过一个简单的例子来进行示范。先设计两个服务actor:Cat,Dog 。假设它们会提供不同的叫声作为服务吧:
class Cat extends Actor with ActorLogging {
//使用pub/sub方式设置
val mediator = DistributedPubSub(context.system).mediator
override def preStart() = {
mediator ! Subscribe("Shout", self)
super.preStart()
}
override def receive: Receive = {
case "Shout" =>
log.info("*******I am a cat, MIAOM ...******")
}
}
class Dog extends Actor with ActorLogging {
//使用pub/sub方式设置
val mediator = DistributedPubSub(context.system).mediator
override def preStart() = {
mediator ! Subscribe("Shout", self)
super.preStart()
}
override def receive: Receive = {
case "Shout" =>
log.info("*****I am a dog, WANG WANG...*****")
}
}
我们看到,这就是两个很普通的actor。但我们还是可以和上一篇分布式pub/sub结合起来验证cluster-client是基于distributed-pub/sub的。然后我们分别把这两个actor(服务)放到不同的集群节点上:
object Cat {
def props = Props[Cat]
def create(port: Int): ActorSystem = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem",config)
val catSound = system.actorOf(props,"CatSound")
ClusterClientReceptionist(system).registerService(catSound)
system
}
}
object Dog {
def props = Props(new Dog)
def create(port: Int): ActorSystem = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem",config)
val dogSound = system.actorOf(props,"DogSound")
ClusterClientReceptionist(system).registerService(dogSound)
system
}
}
注意:集群名称是ClusterSystem。我们分别在actor所在节点用ClusterClientReceptionist.registerService登记了服务。这个集群所使用的conf如下:
akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off
akka {
loglevel = INFO
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
actor {
provider = "cluster"
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"java.lang.String" = java
"scalapb.GeneratedMessage" = proto
}
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"]
log-info = off
}
}
这是一个比较完整的集群配置文档,只有port需要再配置。然后运行这两个节点:
object PetHouse extends App {
val sysCat = Cat.create(2551)
val sysDog = Dog.create(2552)
scala.io.StdIn.readLine()
sysCat.terminate()
sysDog.terminate()
}
完成了在2551,2552节点上的Cat,Dog actor构建及ClusterClientReceptionist.registerService服务登记。现在看看客户端:
object PetClient extends App {
val conf = ConfigFactory.load("client")
val clientSystem = ActorSystem("ClientSystem",conf)
/* 从 conf 文件里读取 contact-points 地址
val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {
case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist"
}.toSet
*/
//先放一个contact-point, 系统会自动增加其它的点
val initialContacts = Set(
ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")
)
val clusterClient = clientSystem.actorOf(
ClusterClient.props(
ClusterClientSettings(clientSystem)
.withInitialContacts(initialContacts)),
"petClient")
clusterClient ! Send("/user/CatSound","Shout",localAffinity = true)
clusterClient ! Send("/user/DogSound","Shout",localAffinity = true)
println(s"sent shout messages ...")
scala.io.StdIn.readLine()
clusterClient ! Publish("Shout","Shout")
println(s"publish shout messages ...")
scala.io.StdIn.readLine()
clientSystem.terminate();
}
客户端的ActorSystem名称为ClientSystem,是在ClusterSystem集群之外的。conf文件如下:
akka {
actor.provider = remote
remote.netty.tcp.port= 2553
remote.netty.tcp.hostname=127.0.0.1
}
contact-points = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
把它设成actor.provider=remote可以免去提供seednodes。运算结果:
[12/08/2018 09:32:51.432] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******
[INFO] [12/08/2018 09:32:51.435] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****
[INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******
[INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****
无论ClusterClient或Receptionist都会针对自己的状态发送消息。我们可以截取这些消息来做些相应的工作。参考下面的截听器示范代码:
package petsound
import akka.actor._
import akka.cluster.client._
class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {
override def preStart(): Unit = {
clusterClient ! SubscribeContactPoints
super.preStart()
}
override def receive: Receive = {
case ContactPoints(cps) =>
cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")}
case ContactPointAdded(cp) =>
log.info(s"*******ContactPointAdded: ${cp.address.toString}*******")
case ContactPointRemoved(cp) =>
log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******")
}
}
class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {
override def preStart(): Unit = {
receptionist ! SubscribeClusterClients
super.preStart()
}
override def receive: Receive = {
case ClusterClients(cs) =>
cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")}
case ClusterClientUp(cc) =>
log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******")
case ClusterClientUnreachable(cc) =>
log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******")
}
}
这两个event-listener的安装方法如下:
val receptionist = ClusterClientReceptionist(system).underlying
system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner")
val receptionist = ClusterClientReceptionist(system).underlying
system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner")
val clusterClient = clientSystem.actorOf(
ClusterClient.props(
ClusterClientSettings(clientSystem)
.withInitialContacts(initialContacts)),
"petClient")
clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")
看看运算结果:
[INFO] [12/09/2018 09:42:40.838] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPoints:akka.tcp://ClusterSystem@127.0.0.1:2551******
[INFO] [12/09/2018 09:42:40.947] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPointAdded: akka.tcp://ClusterSystem@127.0.0.1:2552*******
[INFO] [12/09/2018 09:42:40.967] [ClientSystem-akka.actor.default-dispatcher-15] [akka.tcp://ClientSystem@127.0.0.1:2553/user/petClient] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist]
[INFO] [12/09/2018 09:42:40.979] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUp: akka.tcp://ClientSystem@127.0.0.1:2553*******
[INFO] [12/09/2018 09:54:34.363] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUnreachable: akka.tcp://ClientSystem@127.0.0.1:2553*******
下面我们再做个示范,还是与上篇讨论一样:由集群客户端发送MongoDB指令至某个在集群里用ClusterClientReceptionist注册的MongoDB操作服务actor。服务方接收指令后在MongoDB上进行运算。下面是MongoDB的服务actor:
package petsound
import akka.actor._
import com.typesafe.config._
import akka.actor.ActorSystem
import org.mongodb.scala._
import sdp.grpc.services.ProtoMGOContext
import sdp.mongo.engine.MGOClasses._
import sdp.mongo.engine.MGOEngine._
import sdp.result.DBOResult._
import akka.cluster.client._
import scala.collection.JavaConverters._
import scala.util._
class MongoAdder extends Actor with ActorLogging {
import monix.execution.Scheduler.Implicits.global
implicit val mgosys = context.system
implicit val ec = mgosys.dispatcher
val clientSettings: MongoClientSettings = MongoClientSettings.builder()
.applyToClusterSettings {b =>
b.hosts(List(new ServerAddress("localhost:27017")).asJava)
}.build()
implicit val client: MongoClient = MongoClient(clientSettings)
val ctx = MGOContext("testdb","friends")
override def receive: Receive = {
case someProto @ Some(proto:ProtoMGOContext) =>
val ctx = MGOContext.fromProto(proto)
log.info(s"****** received MGOContext: $someProto *********")
val task = mgoUpdate[Completed](ctx).toTask
task.runOnComplete {
case Success(s) => println("operations completed successfully.")
case Failure(exception) => println(s"error: ${exception.getMessage}")
}
}
}
object MongoAdder {
def create(port: Int): ActorSystem = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", config)
val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder")
ClusterClientReceptionist(system).registerService(mongoAdder)
val receptionist = ClusterClientReceptionist(system).underlying
system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner")
system
}
}
MongoAdder处于同一个集群ClusterSystem中。代码里已经包括了服务注册部分。客户端发送MongoDB指令的示范如下:
//MongoDB 操作示范
import org.mongodb.scala._
import sdp.mongo.engine.MGOClasses._
val ctx = MGOContext("testdb","friends")
val chen = Document("姓" -> "陈", "名" -> "大文","age" -> 28)
val zhang = Document("姓" -> "张", "名" -> "小海","age" -> 7)
val lee = Document("姓" -> "李", "名" -> "四","age" -> 45)
val ouyang = Document("姓" -> "欧阳", "名" -> "锋","age" -> 120)
val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))
clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)
由于MongoDB指令是通过protobuffer方式进行序列化的,所以需要修改client.conf通知akka使用protobuf格式的消息:
akka {
actor {
provider = remote
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"java.lang.String" = java
"scalapb.GeneratedMessage" = proto
}
}
remote.netty.tcp.port= 2553
remote.netty.tcp.hostname=127.0.0.1
}
contact-points = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
下面是本次讨论完整示范源代码:
build.sbt
import scalapb.compiler.Version.scalapbVersion
import scalapb.compiler.Version.grpcJavaVersion
name := "akka-cluster-client"
version := "0.1"
scalaVersion := "2.12.7"
scalacOptions += "-Ypartial-unification"
libraryDependencies := Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.17",
"com.typesafe.akka" %% "akka-cluster-tools" % "2.5.17",
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
// "io.grpc" % "grpc-netty" % grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
"io.monix" %% "monix" % "2.3.0",
//for mongodb 4.0
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0",
"com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.20",
//other dependencies
"co.fs2" %% "fs2-core" % "0.9.7",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.typelevel" %% "cats-core" % "0.9.0",
"io.monix" %% "monix-execution" % "3.0.0-RC1",
"io.monix" %% "monix-eval" % "3.0.0-RC1"
)
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)
project/scalapb.sbt
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies ++= Seq(
"com.thesamet.scalapb" %% "compilerplugin" % "0.7.4"
)
resouces/application.conf
akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off
akka {
loglevel = INFO
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
actor {
provider = "cluster"
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"java.lang.String" = java
"scalapb.GeneratedMessage" = proto
}
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"]
log-info = off
}
}
resources/client.conf
akka {
actor {
provider = remote
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"java.lang.String" = java
"scalapb.GeneratedMessage" = proto
}
}
remote.netty.tcp.port= 2553
remote.netty.tcp.hostname=127.0.0.1
}
contact-points = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
protobuf/spd.proto
syntax = "proto3";
import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";
option (scalapb.options) = {
// use a custom Scala package name
// package_name: "io.ontherocks.introgrpc.demo"
// don''t append file name to package
flat_package: true
// generate one Scala file for all messages (services still get their own file)
single_file: true
// add imports to generated file
// useful when extending traits or using custom types
// import: "io.ontherocks.hellogrpc.RockingMessage"
// code to put at the top of generated file
// works only with `single_file: true`
//preamble: "sealed trait SomeSealedTrait"
};
package sdp.grpc.services;
message ProtoDate {
int32 yyyy = 1;
int32 mm = 2;
int32 dd = 3;
}
message ProtoTime {
int32 hh = 1;
int32 mm = 2;
int32 ss = 3;
int32 nnn = 4;
}
message ProtoDateTime {
ProtoDate date = 1;
ProtoTime time = 2;
}
message ProtoAny {
bytes value = 1;
}
protobuf/mgo.proto
syntax = "proto3";
import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";
option (scalapb.options) = {
// use a custom Scala package name
// package_name: "io.ontherocks.introgrpc.demo"
// don''t append file name to package
flat_package: true
// generate one Scala file for all messages (services still get their own file)
single_file: true
// add imports to generated file
// useful when extending traits or using custom types
// import: "io.ontherocks.hellogrpc.RockingMessage"
// code to put at the top of generated file
// works only with `single_file: true`
//preamble: "sealed trait SomeSealedTrait"
};
/*
* Demoes various customization options provided by ScalaPBs.
*/
package sdp.grpc.services;
import "sdp.proto";
message ProtoMGOBson {
bytes bson = 1;
}
message ProtoMGODocument {
bytes document = 1;
}
message ProtoMGOResultOption { //FindObservable
int32 optType = 1;
ProtoMGOBson bsonParam = 2;
int32 valueParam = 3;
}
message ProtoMGOAdmin{
string tarName = 1;
repeated ProtoMGOBson bsonParam = 2;
ProtoAny options = 3;
string objName = 4;
}
message ProtoMGOContext { //MGOContext
string dbName = 1;
string collName = 2;
int32 commandType = 3;
repeated ProtoMGOBson bsonParam = 4;
repeated ProtoMGOResultOption resultOptions = 5;
repeated string targets = 6;
ProtoAny options = 7;
repeated ProtoMGODocument documents = 8;
google.protobuf.BoolValue only = 9;
ProtoMGOAdmin adminOptions = 10;
}
converters/ByteConverter.scala
package protobuf.bytes
import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
import com.google.protobuf.ByteString
object Converter {
def marshal(value: Any): ByteString = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(value)
oos.close()
ByteString.copyFrom(stream.toByteArray())
}
def unmarshal[A](bytes: ByteString): A = {
val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
val value = ois.readObject()
ois.close()
value.asInstanceOf[A]
}
}
converters/DBOResultType.scala
package sdp.result
import cats._
import cats.data.EitherT
import cats.data.OptionT
import monix.eval.Task
import cats.implicits._
import scala.concurrent._
import scala.collection.TraversableOnce
object DBOResult {
type DBOError[A] = EitherT[Task,Throwable,A]
type DBOResult[A] = OptionT[DBOError,A]
implicit def valueToDBOResult[A](a: A): DBOResult[A] =
Applicative[DBOResult].pure(a)
implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =
OptionT((o: Option[A]).pure[DBOError])
implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = {
// val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))
OptionT.liftF(EitherT.fromEither[Task](e))
}
implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {
val task = Task.fromFuture[A](fut)
val et = EitherT.liftF[Task,Throwable,A](task)
OptionT.liftF(et)
}
implicit class DBOResultToTask[A](r: DBOResult[A]) {
def toTask = r.value.value
}
implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {
def someValue: Option[A] = r match {
case Left(err) => (None: Option[A])
case Right(oa) => oa
}
}
def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =
if (coll.isEmpty)
optionToDBOResult(None: Option[C[A]])
else
optionToDBOResult(Some(coll): Option[C[A]])
}
filestream/FileStreaming.scala
package sdp.file
import java.io.{ByteArrayInputStream, InputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths
import akka.stream.Materializer
import akka.stream.scaladsl.{FileIO, StreamConverters}
import akka.util._
import scala.concurrent.Await
import scala.concurrent.duration._
object Streaming {
def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
implicit mat: Materializer):ByteBuffer = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toByteBuffer
}
def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
implicit mat: Materializer): Array[Byte] = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toArray
}
def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
implicit mat: Materializer): InputStream = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
val buf = (Await.result(fut, timeOut)).toArray
new ByteArrayInputStream(buf)
}
def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
implicit mat: Materializer) = {
val ba = new Array[Byte](byteBuf.remaining())
byteBuf.get(ba,0,ba.length)
val baInput = new ByteArrayInputStream(ba)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
}
def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
implicit mat: Materializer) = {
val bb = ByteBuffer.wrap(bytes)
val baInput = new ByteArrayInputStream(bytes)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
}
def InputStreamToFile(is: InputStream, fileName: String)(
implicit mat: Materializer) = {
val source = StreamConverters.fromInputStream(() => is)
source.runWith(FileIO.toPath(Paths.get(fileName)))
}
}
logging/Log.scala
package sdp.logging
import org.slf4j.Logger
/**
* Logger which just wraps org.slf4j.Logger internally.
*
* @param logger logger
*/
class Log(logger: Logger) {
// use var consciously to enable squeezing later
var isDebugEnabled: Boolean = logger.isDebugEnabled
var isInfoEnabled: Boolean = logger.isInfoEnabled
var isWarnEnabled: Boolean = logger.isWarnEnabled
var isErrorEnabled: Boolean = logger.isErrorEnabled
def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
level match {
case ''debug | ''DEBUG => debug(msg)
case ''info | ''INFO => info(msg)
case ''warn | ''WARN => warn(msg)
case ''error | ''ERROR => error(msg)
case _ => // nothing to do
}
}
def debug(msg: => String): Unit = {
if (isDebugEnabled && logger.isDebugEnabled) {
logger.debug(msg)
}
}
def debug(msg: => String, e: Throwable): Unit = {
if (isDebugEnabled && logger.isDebugEnabled) {
logger.debug(msg, e)
}
}
def info(msg: => String): Unit = {
if (isInfoEnabled && logger.isInfoEnabled) {
logger.info(msg)
}
}
def info(msg: => String, e: Throwable): Unit = {
if (isInfoEnabled && logger.isInfoEnabled) {
logger.info(msg, e)
}
}
def warn(msg: => String): Unit = {
if (isWarnEnabled && logger.isWarnEnabled) {
logger.warn(msg)
}
}
def warn(msg: => String, e: Throwable): Unit = {
if (isWarnEnabled && logger.isWarnEnabled) {
logger.warn(msg, e)
}
}
def error(msg: => String): Unit = {
if (isErrorEnabled && logger.isErrorEnabled) {
logger.error(msg)
}
}
def error(msg: => String, e: Throwable): Unit = {
if (isErrorEnabled && logger.isErrorEnabled) {
logger.error(msg, e)
}
}
}
logging/LogSupport.scala
package sdp.logging
import org.slf4j.LoggerFactory
trait LogSupport {
/**
* Logger
*/
protected val log = new Log(LoggerFactory.getLogger(this.getClass))
}
mgo/engine/MGOProtoConversion.scala
package sdp.mongo.engine
import org.mongodb.scala.bson.collection.immutable.Document
import org.bson.conversions.Bson
import sdp.grpc.services._
import protobuf.bytes.Converter._
import MGOClasses._
import MGOAdmins._
import MGOCommands._
import org.bson.BsonDocument
import org.bson.codecs.configuration.CodecRegistry
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.FindObservable
object MGOProtoConversion {
type MGO_COMMAND_TYPE = Int
val MGO_COMMAND_FIND = 0
val MGO_COMMAND_COUNT = 20
val MGO_COMMAND_DISTICT = 21
val MGO_COMMAND_DOCUMENTSTREAM = 1
val MGO_COMMAND_AGGREGATE = 2
val MGO_COMMAND_INSERT = 3
val MGO_COMMAND_DELETE = 4
val MGO_COMMAND_REPLACE = 5
val MGO_COMMAND_UPDATE = 6
val MGO_ADMIN_DROPCOLLECTION = 8
val MGO_ADMIN_CREATECOLLECTION = 9
val MGO_ADMIN_LISTCOLLECTION = 10
val MGO_ADMIN_CREATEVIEW = 11
val MGO_ADMIN_CREATEINDEX = 12
val MGO_ADMIN_DROPINDEXBYNAME = 13
val MGO_ADMIN_DROPINDEXBYKEY = 14
val MGO_ADMIN_DROPALLINDEXES = 15
case class AdminContext(
tarName: String = "",
bsonParam: Seq[Bson] = Nil,
options: Option[Any] = None,
objName: String = ""
){
def toProto = sdp.grpc.services.ProtoMGOAdmin(
tarName = this.tarName,
bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
objName = this.objName,
options = this.options.map(b => ProtoAny(marshal(b)))
)
}
object AdminContext {
def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
tarName = msg.tarName,
bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
objName = msg.objName,
options = msg.options.map(b => unmarshal[Any](b.value))
)
}
case class Context(
dbName: String = "",
collName: String = "",
commandType: MGO_COMMAND_TYPE,
bsonParam: Seq[Bson] = Nil,
resultOptions: Seq[ResultOptions] = Nil,
options: Option[Any] = None,
documents: Seq[Document] = Nil,
targets: Seq[String] = Nil,
only: Boolean = false,
adminOptions: Option[AdminContext] = None
){
def toProto = new sdp.grpc.services.ProtoMGOContext(
dbName = this.dbName,
collName = this.collName,
commandType = this.commandType,
bsonParam = this.bsonParam.map(bsonToProto),
resultOptions = this.resultOptions.map(_.toProto),
options = { if(this.options == None)
None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else
Some(ProtoAny(marshal(this.options.get))) },
documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
targets = this.targets,
only = Some(this.only),
adminOptions = this.adminOptions.map(_.toProto)
)
}
object MGODocument {
def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
unmarshal[Document](msg.document)
def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
new ProtoMGODocument(marshal(doc))
}
object MGOProtoMsg {
def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
dbName = msg.dbName,
collName = msg.collName,
commandType = msg.commandType,
bsonParam = msg.bsonParam.map(protoToBson),
resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
options = msg.options.map(a => unmarshal[Any](a.value)),
documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
targets = msg.targets,
adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
)
}
def bsonToProto(bson: Bson) =
ProtoMGOBson(marshal(bson.toBsonDocument(
classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
val bsdoc = unmarshal[BsonDocument](proto.bson)
override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
}
def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
case MGO_COMMAND_FIND => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Find())
)
def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
(proto.bsonParam, proto.resultOptions, proto.only) match {
case (Nil, Nil, None) => ctx
case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
case (bp,Nil,None) => ctx.setCommand(
Find(filter = Some(protoToBson(bp.head))))
case (bp,Nil,Some(b)) => ctx.setCommand(
Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
case (bp,fo,None) => {
ctx.setCommand(
Find(filter = Some(protoToBson(bp.head)),
andThen = fo.map(ResultOptions.fromProto)
))
}
case (bp,fo,Some(b)) => {
ctx.setCommand(
Find(filter = Some(protoToBson(bp.head)),
andThen = fo.map(ResultOptions.fromProto),
firstOnly = b))
}
case _ => ctx
}
}
case MGO_COMMAND_COUNT => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Count())
)
(proto.bsonParam, proto.options) match {
case (Nil, None) => ctx
case (bp, None) => ctx.setCommand(
Count(filter = Some(protoToBson(bp.head)))
)
case (Nil,Some(o)) => ctx.setCommand(
Count(options = Some(unmarshal[Any](o.value)))
)
case _ => ctx
}
}
case MGO_COMMAND_DISTICT => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Distict(fieldName = proto.targets.head))
)
(proto.bsonParam) match {
case Nil => ctx
case bp: Seq[ProtoMGOBson] => ctx.setCommand(
Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
)
case _ => ctx
}
}
case MGO_COMMAND_AGGREGATE => {
new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
)
}
case MGO_ADMIN_LISTCOLLECTION => {
new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(ListCollection(proto.dbName)))
}
case MGO_COMMAND_INSERT => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Insert(
newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(Insert(
newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_COMMAND_DELETE => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Delete(
filter = protoToBson(proto.bsonParam.head)))
)
(proto.options, proto.only) match {
case (None,None) => ctx
case (None,Some(b)) => ctx.setCommand(Delete(
filter = protoToBson(proto.bsonParam.head),
onlyOne = b))
case (Some(o),None) => ctx.setCommand(Delete(
filter = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)))
)
case (Some(o),Some(b)) => ctx.setCommand(Delete(
filter = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)),
onlyOne = b)
)
}
}
case MGO_COMMAND_REPLACE => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Replace(
filter = protoToBson(proto.bsonParam.head),
replacement = unmarshal[Document](proto.documents.head.document)))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(Replace(
filter = protoToBson(proto.bsonParam.head),
replacement = unmarshal[Document](proto.documents.head.document),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_COMMAND_UPDATE => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head)))
)
(proto.options, proto.only) match {
case (None,None) => ctx
case (None,Some(b)) => ctx.setCommand(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head),
onlyOne = b))
case (Some(o),None) => ctx.setCommand(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head),
options = Some(unmarshal[Any](o.value)))
)
case (Some(o),Some(b)) => ctx.setCommand(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head),
options = Some(unmarshal[Any](o.value)),
onlyOne = b)
)
}
}
case MGO_ADMIN_DROPCOLLECTION =>
new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropCollection(proto.collName))
)
case MGO_ADMIN_CREATECOLLECTION => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(CreateCollection(proto.collName))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_CREATEVIEW => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(CreateView(viewName = proto.targets.head,
viewOn = proto.targets.tail.head,
pipeline = proto.bsonParam.map(p => protoToBson(p))))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
viewOn = proto.targets.tail.head,
pipeline = proto.bsonParam.map(p => protoToBson(p)),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_CREATEINDEX=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_DROPINDEXBYNAME=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropIndexByName(indexName = proto.targets.head))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_DROPINDEXBYKEY=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_DROPALLINDEXES=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropAllIndexes())
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(DropAllIndexes(
options = Some(unmarshal[Any](o.value)))
)
}
}
}
def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
case None => None
case Some(act) => act match {
case Count(filter, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_COUNT,
bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
else Seq(bsonToProto(filter.get))},
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case Distict(fieldName, filter) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_DISTICT,
bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
else Seq(bsonToProto(filter.get))},
targets = Seq(fieldName)
))
case Find(filter, andThen, firstOnly) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_FIND,
bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
else Seq(bsonToProto(filter.get))},
resultOptions = andThen.map(_.toProto)
))
case Aggregate(pipeLine) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_AGGREGATE,
bsonParam = pipeLine.map(bsonToProto)
))
case Insert(newdocs, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_INSERT,
documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case Delete(filter, options, onlyOne) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_DELETE,
bsonParam = Seq(bsonToProto(filter)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
only = Some(onlyOne)
))
case Replace(filter, replacement, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_REPLACE,
bsonParam = Seq(bsonToProto(filter)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
documents = Seq(ProtoMGODocument(marshal(replacement)))
))
case Update(filter, update, options, onlyOne) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_UPDATE,
bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
only = Some(onlyOne)
))
case DropCollection(coll) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = coll,
commandType = MGO_ADMIN_DROPCOLLECTION
))
case CreateCollection(coll, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = coll,
commandType = MGO_ADMIN_CREATECOLLECTION,
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case ListCollection(dbName) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
commandType = MGO_ADMIN_LISTCOLLECTION
))
case CreateView(viewName, viewOn, pipeline, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_CREATEVIEW,
bsonParam = pipeline.map(bsonToProto),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
targets = Seq(viewName,viewOn)
))
case CreateIndex(key, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_CREATEINDEX,
bsonParam = Seq(bsonToProto(key)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case DropIndexByName(indexName, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_DROPINDEXBYNAME,
targets = Seq(indexName),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case DropIndexByKey(key, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_DROPINDEXBYKEY,
bsonParam = Seq(bsonToProto(key)),
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case DropAllIndexes(options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_DROPALLINDEXES,
options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
}
}
}
mgo/engine/MongoDBEngine.scala
package sdp.mongo.engine
import java.text.SimpleDateFormat
import java.util.Calendar
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.alpakka.mongodb.scaladsl._
import akka.stream.scaladsl.{Flow, Source}
import org.bson.conversions.Bson
import org.mongodb.scala.bson.collection.immutable.Document
import org.mongodb.scala.bson.{BsonArray, BsonBinary}
import org.mongodb.scala.model._
import org.mongodb.scala.{MongoClient, _}
import protobuf.bytes.Converter._
import sdp.file.Streaming._
import sdp.logging.LogSupport
import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
object MGOClasses {
type MGO_ACTION_TYPE = Int
val MGO_QUERY = 0
val MGO_UPDATE = 1
val MGO_ADMIN = 2
/* org.mongodb.scala.FindObservable
import com.mongodb.async.client.FindIterable
val resultDocType = FindIterable[Document]
val resultOption = FindObservable(resultDocType)
.maxScan(...)
.limit(...)
.sort(...)
.project(...) */
type FOD_TYPE = Int
val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
//Sets a document describing the fields to return for all matching documents
val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
//Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
//Sets the cursor type
val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
//Sets the hint for which index to use. A null value means no hint is set
val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
//Sets the exclusive upper bound for a specific index. A null value means no max is set
val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
//Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
//Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
//Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
case class ResultOptions(
optType: FOD_TYPE,
bson: Option[Bson] = None,
value: Int = 0 ){
def toProto = new sdp.grpc.services.ProtoMGOResultOption(
optType = this.optType,
bsonParam = this.bson.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
valueParam = this.value
)
def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
optType match {
case FOD_FIRST => find
case FOD_FILTER => find.filter(bson.get)
case FOD_LIMIT => find.limit(value)
case FOD_SKIP => find.skip(value)
case FOD_PROJECTION => find.projection(bson.get)
case FOD_SORT => find.sort(bson.get)
case FOD_PARTIAL => find.partial(value != 0)
case FOD_CURSORTYPE => find
case FOD_HINT => find.hint(bson.get)
case FOD_MAX => find.max(bson.get)
case FOD_MIN => find.min(bson.get)
case FOD_RETURNKEY => find.returnKey(value != 0)
case FOD_SHOWRECORDID => find.showRecordId(value != 0)
}
}
}
object ResultOptions {
def fromProto(msg: sdp.grpc.services.ProtoMGOResultOption) = new ResultOptions(
optType = msg.optType,
bson = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
value = msg.valueParam
)
}
trait MGOCommands
object MGOCommands {
case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands
case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands
/* org.mongodb.scala.FindObservable
import com.mongodb.async.client.FindIterable
val resultDocType = FindIterable[Document]
val resultOption = FindObservable(resultDocType)
.maxScan(...)
.limit(...)
.sort(...)
.project(...) */
case class Find(filter: Option[Bson] = None,
andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
firstOnly: Boolean = false) extends MGOCommands
case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
}
object MGOAdmins {
case class DropCollection(collName: String) extends MGOCommands
case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
case class ListCollection(dbName: String) extends MGOCommands
case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
}
case class MGOContext(
dbName: String,
collName: String,
actionType: MGO_ACTION_TYPE = MGO_QUERY,
action: Option[MGOCommands] = None,
actionOptions: Option[Any] = None,
actionTargets: Seq[String] = Nil
) {
ctx =>
def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
def setCollName(name: String): MGOContext = ctx.copy(collName = name)
def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
def toSomeProto = MGOProtoConversion.ctxToProto(this)
}
object MGOContext {
def apply(db: String, coll: String) = new MGOContext(db, coll)
def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
MGOProtoConversion.ctxFromProto(proto)
}
case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
ctxs =>
def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
def appendContext(ctx: MGOContext): MGOBatContext =
ctxs.copy(contexts = contexts :+ ctx)
}
object MGOBatContext {
def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
}
type MGODate = java.util.Date
def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
val ca = Calendar.getInstance()
ca.set(yyyy,mm,dd)
ca.getTime()
}
def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
val ca = Calendar.getInstance()
ca.set(yyyy,mm,dd,hr,min,sec)
ca.getTime()
}
def mgoDateTimeNow: MGODate = {
val ca = Calendar.getInstance()
ca.getTime
}
def mgoDateToString(dt: MGODate, formatString: String): String = {
val fmt= new SimpleDateFormat(formatString)
fmt.format(dt)
}
type MGOBlob = BsonBinary
type MGOArray = BsonArray
def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
def mgoBlobToFile(blob: MGOBlob, fileName: String)(
implicit mat: Materializer) = ByteArrayToFile(blob.getData,fileName)
def mgoGetStringOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getString(fieldName))
else None
}
def mgoGetIntOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getInteger(fieldName))
else None
}
def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getLong(fieldName))
else None
}
def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getDouble(fieldName))
else None
}
def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getBoolean(fieldName))
else None
}
def mgoGetDateOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
Some(doc.getDate(fieldName))
else None
}
def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
else None
}
def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
if (doc.keySet.contains(fieldName))
doc.get(fieldName).asInstanceOf[Option[MGOArray]]
else None
}
def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
(arr.getValues.asScala.toList)
.asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
}
type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
}
object MGOEngine extends LogSupport {
import MGOClasses._
import MGOAdmins._
import MGOCommands._
import sdp.result.DBOResult._
object TxUpdateMode {
private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
log.info(s"mgoTxUpdate> calling ...")
observable.map(clientSession => {
val transactionOptions =
TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY).build()
clientSession.startTransaction(transactionOptions)
/*
val fut = Future.traverse(ctxs.contexts) { ctx =>
mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
}
Await.ready(fut, 3 seconds) */
ctxs.contexts.foreach { ctx =>
mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
}
clientSession
})
}
private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
log.info(s"commitAndRetry> calling ...")
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
log.error(s"commitAndRetry> Exception during commit ...: $e")
throw e
}
})
}
private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
log.info(s"runTransactionAndRetry> calling ...")
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def mgoTxBatch(ctxs: MGOBatContext)(
implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
log.info(s"mgoTxBatch> MGOBatContext: ${ctxs}")
val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
valueToDBOResult(Completed())
}
}
def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
log.info(s"mgoUpdateBatch> MGOBatContext: ${ctxs}")
if (ctxs.tx) {
TxUpdateMode.mgoTxBatch(ctxs)
} else {
/*
val fut = Future.traverse(ctxs.contexts) { ctx =>
mgoUpdate[Completed](ctx).map(identity) }
Await.ready(fut, 3 seconds)
Future.successful(new Completed) */
ctxs.contexts.foreach { ctx =>
mgoUpdate[Completed](ctx).map(identity) }
valueToDBOResult(Completed())
}
}
def mongoStream(ctx: MGOContext)(
implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
log.info(s"mongoStream> MGOContext: ${ctx}")
def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
val db = client.getDatabase(ctx.dbName)
val coll = db.getCollection(ctx.collName)
if ( ctx.action == None) {
log.error(s"mongoStream> uery action cannot be null!")
throw new IllegalArgumentException("query action cannot be null!")
}
try {
ctx.action.get match {
case Find(None, Nil, false) => //FindObservable
MongoSource(coll.find())
case Find(None, Nil, true) => //FindObservable
MongoSource(coll.find().first())
case Find(Some(filter), Nil, false) => //FindObservable
MongoSource(coll.find(filter))
case Find(Some(filter), Nil, true) => //FindObservable
MongoSource(coll.find(filter).first())
case Find(None, sro, _) => //FindObservable
val next = toResultOption(sro)
MongoSource(next(coll.find[Document]()))
case Find(Some(filter), sro, _) => //FindObservable
val next = toResultOption(sro)
MongoSource(next(coll.find[Document](filter)))
case _ =>
log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
}
}
catch { case e: Exception =>
log.error(s"mongoStream> runtime error: ${e.getMessage}")
throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
}
}
// T => FindIterable e.g List[Document]
def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {
log.info(s"mgoQuery> MGOContext: ${ctx}")
val db = client.getDatabase(ctx.dbName)
val coll = db.getCollection(ctx.collName)
def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
if ( ctx.action == None) {
log.error(s"mgoQuery> uery action cannot be null!")
Left(new IllegalArgumentException("query action cannot be null!"))
}
try {
ctx.action.get match {
/* count */
case Count(Some(filter), Some(opt)) => //SingleObservable
coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
.toFuture().asInstanceOf[Future[T]]
case Count(Some(filter), None) => //SingleObservable
coll.countDocuments(filter).toFuture()
.asInstanceOf[Future[T]]
case Count(None, None) => //SingleObservable
coll.countDocuments().toFuture()
.asInstanceOf[Future[T]]
/* distinct */
case Distict(field, Some(filter)) => //DistinctObservable
coll.distinct(field, filter).toFuture()
.asInstanceOf[Future[T]]
case Distict(field, None) => //DistinctObservable
coll.distinct((field)).toFuture()
.asInstanceOf[Future[T]]
/* find */
case Find(None, Nil, false) => //FindObservable
if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
case Find(None, Nil, true) => //FindObservable
if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
case Find(Some(filter), Nil, false) => //FindObservable
if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
case Find(Some(filter), Nil, true) => //FindObservable
if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
case Find(None, sro, _) => //FindObservable
val next = toResultOption(sro)
if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
case Find(Some(filter), sro, _) => //FindObservable
val next = toResultOption(sro)
if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
/* aggregate AggregateObservable*/
case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
/* mapReduce MapReduceObservable*/
case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
/* list collection */
case ListCollection(dbName) => //ListConllectionObservable
client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
}
}
catch { case e: Exception =>
log.error(s"mgoQuery> runtime error: ${e.getMessage}")
Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))
}
}
//T => Completed, result.UpdateResult, result.DeleteResult
def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =
try {
mgoUpdateObservable[T](ctx).toFuture()
}
catch { case e: Exception =>
log.error(s"mgoUpdate> runtime error: ${e.getMessage}")
Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))
}
def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
log.info(s"mgoUpdateObservable> MGOContext: ${ctx}")
val db = client.getDatabase(ctx.dbName)
val coll = db.getCollection(ctx.collName)
if ( ctx.action == None) {
log.error(s"mgoUpdateObservable> uery action cannot be null!")
throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")
}
try {
ctx.action.get match {
/* insert */
case Insert(docs, Some(opt)) => //SingleObservable[Completed]
if (docs.size > 1)
coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
case Insert(docs, None) => //SingleObservable
if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
/* delete */
case Delete(filter, None, onlyOne) => //SingleObservable
if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
case Delete(filter, Some(opt), onlyOne) => //SingleObservable
if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
/* replace */
case Replace(filter, replacement, None) => //SingleObservable
coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
case Replace(filter, replacement, Some(opt)) => //SingleObservable
coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]
/* update */
case Update(filter, update, None, onlyOne) => //SingleObservable
if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
/* bulkWrite */
case BulkWrite(commands, None) => //SingleObservable
coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
case BulkWrite(commands, Some(opt)) => //SingleObservable
coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
}
}
catch { case e: Exception =>
log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")
throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")
}
}
def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {
log.info(s"mgoAdmin> MGOContext: ${ctx}")
val db = client.getDatabase(ctx.dbName)
val coll = db.getCollection(ctx.collName)
if ( ctx.action == None) {
log.error(s"mgoAdmin> uery action cannot be null!")
Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))
}
try {
ctx.action.get match {
/* drop collection */
case DropCollection(collName) => //SingleObservable
val coll = db.getCollection(collName)
coll.drop().toFuture()
/* create collection */
case CreateCollection(collName, None) => //SingleObservable
db.createCollection(collName).toFuture()
case CreateCollection(collName, Some(opt)) => //SingleObservable
db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()
/* list collection
case ListCollection(dbName) => //ListConllectionObservable
client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
*/
/* create view */
case CreateView(viewName, viewOn, pline, None) => //SingleObservable
db.createView(viewName, viewOn, pline).toFuture()
case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()
/* create index */
case CreateIndex(key, None) => //SingleObservable
coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
case CreateIndex(key, Some(opt)) => //SingleObservable
coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
/* drop index */
case DropIndexByName(indexName, None) => //SingleObservable
coll.dropIndex(indexName).toFuture()
case DropIndexByName(indexName, Some(opt)) => //SingleObservable
coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()
case DropIndexByKey(key, None) => //SingleObservable
coll.dropIndex(key).toFuture()
case DropIndexByKey(key, Some(opt)) => //SingleObservable
coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()
case DropAllIndexes(None) => //SingleObservable
coll.dropIndexes().toFuture()
case DropAllIndexes(Some(opt)) => //SingleObservable
coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()
}
}
catch { case e: Exception =>
log.error(s"mgoAdmin> runtime error: ${e.getMessage}")
throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")
}
}
/*
def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
val db = client.getDatabase(ctx.dbName)
val coll = db.getCollection(ctx.collName)
ctx.action match {
/* count */
case Count(Some(filter), Some(opt)) => //SingleObservable
coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
.toFuture().asInstanceOf[Future[T]]
case Count(Some(filter), None) => //SingleObservable
coll.countDocuments(filter).toFuture()
.asInstanceOf[Future[T]]
case Count(None, None) => //SingleObservable
coll.countDocuments().toFuture()
.asInstanceOf[Future[T]]
/* distinct */
case Distict(field, Some(filter)) => //DistinctObservable
coll.distinct(field, filter).toFuture()
.asInstanceOf[Future[T]]
case Distict(field, None) => //DistinctObservable
coll.distinct((field)).toFuture()
.asInstanceOf[Future[T]]
/* find */
case Find(None, None, optConv, false) => //FindObservable
if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
case Find(None, None, optConv, true) => //FindObservable
if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
case Find(Some(filter), None, optConv, false) => //FindObservable
if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
case Find(Some(filter), None, optConv, true) => //FindObservable
if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
case Find(None, Some(next), optConv, _) => //FindObservable
if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
case Find(Some(filter), Some(next), optConv, _) => //FindObservable
if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
/* aggregate AggregateObservable*/
case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
/* mapReduce MapReduceObservable*/
case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
/* insert */
case Insert(docs, Some(opt)) => //SingleObservable[Completed]
if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
.asInstanceOf[Future[T]]
else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
.asInstanceOf[Future[T]]
case Insert(docs, None) => //SingleObservable
if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
/* delete */
case Delete(filter, None, onlyOne) => //SingleObservable
if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
case Delete(filter, Some(opt), onlyOne) => //SingleObservable
if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
/* replace */
case Replace(filter, replacement, None) => //SingleObservable
coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
case Replace(filter, replacement, Some(opt)) => //SingleObservable
coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
/* update */
case Update(filter, update, None, onlyOne) => //SingleObservable
if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
/* bulkWrite */
case BulkWrite(commands, None) => //SingleObservable
coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
case BulkWrite(commands, Some(opt)) => //SingleObservable
coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
/* drop collection */
case DropCollection(collName) => //SingleObservable
val coll = db.getCollection(collName)
coll.drop().toFuture().asInstanceOf[Future[T]]
/* create collection */
case CreateCollection(collName, None) => //SingleObservable
db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
case CreateCollection(collName, Some(opt)) => //SingleObservable
db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
/* list collection */
case ListCollection(dbName) => //ListConllectionObservable
client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
/* create view */
case CreateView(viewName, viewOn, pline, None) => //SingleObservable
db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
/* create index */
case CreateIndex(key, None) => //SingleObservable
coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
case CreateIndex(key, Some(opt)) => //SingleObservable
coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
/* drop index */
case DropIndexByName(indexName, None) => //SingleObservable
coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
case DropIndexByName(indexName, Some(opt)) => //SingleObservable
coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
case DropIndexByKey(key, None) => //SingleObservable
coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
case DropIndexByKey(key, Some(opt)) => //SingleObservable
coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
case DropAllIndexes(None) => //SingleObservable
coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
case DropAllIndexes(Some(opt)) => //SingleObservable
coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
}
}
*/
}
object MongoActionStream {
import MGOClasses._
case class StreamingInsert[A](dbName: String,
collName: String,
converter: A => Document,
parallelism: Int = 1
) extends MGOCommands
case class StreamingDelete[A](dbName: String,
collName: String,
toFilter: A => Bson,
parallelism: Int = 1,
justOne: Boolean = false
) extends MGOCommands
case class StreamingUpdate[A](dbName: String,
collName: String,
toFilter: A => Bson,
toUpdate: A => Bson,
parallelism: Int = 1,
justOne: Boolean = false
) extends MGOCommands
case class InsertAction[A](ctx: StreamingInsert[A])(
implicit mongoClient: MongoClient) {
val database = mongoClient.getDatabase(ctx.dbName)
val collection = database.getCollection(ctx.collName)
def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
Flow[A].map(ctx.converter)
.mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
}
case class UpdateAction[A](ctx: StreamingUpdate[A])(
implicit mongoClient: MongoClient) {
val database = mongoClient.getDatabase(ctx.dbName)
val collection = database.getCollection(ctx.collName)
def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
if (ctx.justOne) {
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
} else
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
}
case class DeleteAction[A](ctx: StreamingDelete[A])(
implicit mongoClient: MongoClient) {
val database = mongoClient.getDatabase(ctx.dbName)
val collection = database.getCollection(ctx.collName)
def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
if (ctx.justOne) {
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
} else
Flow[A]
.mapAsync(ctx.parallelism)(a =>
collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
}
}
object MGOHelpers {
implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
override val converter: (Document) => String = (doc) => doc.toJson
}
implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
override val converter: (C) => String = (doc) => doc.toString
}
trait ImplicitObservable[C] {
val observable: Observable[C]
val converter: (C) => String
def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
def headResult() = Await.result(observable.head(), 10 seconds)
def printResults(initial: String = ""): Unit = {
if (initial.length > 0) print(initial)
results().foreach(res => println(converter(res)))
}
def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
}
def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
Await.result(fut, timeOut)
}
def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
Await.result(fut, timeOut)
}
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
final class FutureToTask[A](x: => Future[A]) {
def asTask: Task[A] = Task.deferFuture[A](x)
}
final class TaskToFuture[A](x: => Task[A]) {
def asFuture: Future[A] = x.runAsync
}
}
PetSound.scala
package petsound
import akka.actor._
import akka.cluster.client._
import com.typesafe.config.ConfigFactory
import akka.cluster.pubsub.DistributedPubSubMediator._
import akka.cluster.pubsub._
object Cat {
def props = Props[Cat]
def create(port: Int): ActorSystem = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem",config)
val catSound = system.actorOf(props,"CatSound")
ClusterClientReceptionist(system).registerService(catSound)
val receptionist = ClusterClientReceptionist(system).underlying
system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner")
system
}
}
object Dog {
def props = Props(new Dog)
def create(port: Int): ActorSystem = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem",config)
val dogSound = system.actorOf(props,"DogSound")
ClusterClientReceptionist(system).registerService(dogSound)
val receptionist = ClusterClientReceptionist(system).underlying
system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner")
system
}
}
class Cat extends Actor with ActorLogging {
//使用pub/sub方式设置
val mediator = DistributedPubSub(context.system).mediator
override def preStart() = {
mediator ! Subscribe("Shout", self)
super.preStart()
}
override def receive: Receive = {
case "Shout" =>
log.info("*******I am a cat, MIAOM ...******")
}
}
class Dog extends Actor with ActorLogging {
//使用pub/sub方式设置
val mediator = DistributedPubSub(context.system).mediator
override def preStart() = {
mediator ! Subscribe("Shout", self)
super.preStart()
}
override def receive: Receive = {
case "Shout" =>
log.info("*****I am a dog, WANG WANG...*****")
}
}
EventListener.scala
package petsound
import akka.actor._
import akka.cluster.client._
class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {
override def preStart(): Unit = {
clusterClient ! SubscribeContactPoints
super.preStart()
}
override def receive: Receive = {
case ContactPoints(cps) =>
cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")}
case ContactPointAdded(cp) =>
log.info(s"*******ContactPointAdded: ${cp.address.toString}*******")
case ContactPointRemoved(cp) =>
log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******")
}
}
class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {
override def preStart(): Unit = {
receptionist ! SubscribeClusterClients
super.preStart()
}
override def receive: Receive = {
case ClusterClients(cs) =>
cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")}
case ClusterClientUp(cc) =>
log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******")
case ClusterClientUnreachable(cc) =>
log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******")
}
}
MongoAdder.scala
package petsound
import akka.actor._
import com.typesafe.config._
import akka.actor.ActorSystem
import org.mongodb.scala._
import sdp.grpc.services.ProtoMGOContext
import sdp.mongo.engine.MGOClasses._
import sdp.mongo.engine.MGOEngine._
import sdp.result.DBOResult._
import akka.cluster.client._
import scala.collection.JavaConverters._
import scala.util._
class MongoAdder extends Actor with ActorLogging {
import monix.execution.Scheduler.Implicits.global
implicit val mgosys = context.system
implicit val ec = mgosys.dispatcher
val clientSettings: MongoClientSettings = MongoClientSettings.builder()
.applyToClusterSettings {b =>
b.hosts(List(new ServerAddress("localhost:27017")).asJava)
}.build()
implicit val client: MongoClient = MongoClient(clientSettings)
val ctx = MGOContext("testdb","friends")
override def receive: Receive = {
case someProto @ Some(proto:ProtoMGOContext) =>
val ctx = MGOContext.fromProto(proto)
log.info(s"****** received MGOContext: $someProto *********")
val task = mgoUpdate[Completed](ctx).toTask
task.runOnComplete {
case Success(s) => println("operations completed successfully.")
case Failure(exception) => println(s"error: ${exception.getMessage}")
}
}
}
object MongoAdder {
def create(port: Int): ActorSystem = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", config)
val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder")
ClusterClientReceptionist(system).registerService(mongoAdder)
val receptionist = ClusterClientReceptionist(system).underlying
system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner")
system
}
}
PetHouse.scala
package petsound
import akka.actor._
import akka.japi.Util.immutableSeq
import akka.actor.AddressFromURIString
import com.typesafe.config.ConfigFactory
import akka.cluster.client._
import akka.cluster.client.ClusterClient._
object PetHouse extends App {
val sysCat = Cat.create(2551)
val sysDog = Dog.create(2552)
val mongo = MongoAdder.create(2555)
scala.io.StdIn.readLine()
sysCat.terminate()
sysDog.terminate()
mongo.terminate()
}
object PetClient extends App {
val conf = ConfigFactory.load("client")
val clientSystem = ActorSystem("ClientSystem",conf)
/* 从 conf 文件里读取 contact-points 地址
val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {
case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist"
}.toSet
*/
//先放一个contact-point, 系统会自动增加其它的点
val initialContacts = Set(
ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")
)
val clusterClient = clientSystem.actorOf(
ClusterClient.props(
ClusterClientSettings(clientSystem)
.withInitialContacts(initialContacts)), "petClient")
clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")
clusterClient ! Send("/user/CatSound","Shout",localAffinity = true)
clusterClient ! Send("/user/DogSound","Shout",localAffinity = true)
println(s"sent shout messages ...")
scala.io.StdIn.readLine()
clusterClient ! Publish("Shout","Shout")
println(s"publish shout messages ...")
//MongoDB 操作示范
import org.mongodb.scala._
import sdp.mongo.engine.MGOClasses._
val ctx = MGOContext("testdb","friends")
val chen = Document("姓" -> "陈", "名" -> "大文","age" -> 28)
val zhang = Document("姓" -> "张", "名" -> "小海","age" -> 7)
val lee = Document("姓" -> "李", "名" -> "四","age" -> 45)
val ouyang = Document("姓" -> "欧阳", "名" -> "锋","age" -> 120)
val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))
clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)
scala.io.StdIn.readLine()
clientSystem.terminate()
}
原文出处:https://www.cnblogs.com/tiger-xc/p/10094659.html
Akka源码分析-Cluster-ClusterClient
ClusterClient可以与某个集群通信,而本身节点不必是集群的一部分。它只需要知道一个或多个节点的位置作为联系节点。它会跟ClusterReceptionist 建立连接,来跟集群中的特定节点发送消息。而且必须把provider改成remote或cluster。receptionist需要在集群所有节点或一组节点内启动,它可以自行启动或通过ClusterReceptionist 扩展来启动。ClusterClient可以进行通信的actor必须是通过ClusterReceptionis扩展注册过的actor。
看到这里,你是不是想骂人了,这么简单的功能我都可以自己实现了。不过akka就是这样,一些看似非常简单的功能,框架本身提供的功能更加稳定、更加通用,但性能不一定是最优的。废话不多说,我们来看看ClusterClient的具体实现。
/**
* This actor is intended to be used on an external node that is not member
* of the cluster. It acts like a gateway for sending messages to actors
* somewhere in the cluster. From the initial contact points it will establish
* a connection to a [[ClusterReceptionist]] somewhere in the cluster. It will
* monitor the connection to the receptionist and establish a new connection if
* the link goes down. When looking for a new receptionist it uses fresh contact
* points retrieved from previous establishment, or periodically refreshed
* contacts, i.e. not necessarily the initial contact points.
*
* You can send messages via the `ClusterClient` to any actor in the cluster
* that is registered in the [[ClusterReceptionist]].
* Messages are wrapped in [[ClusterClient.Send]], [[ClusterClient.SendToAll]]
* or [[ClusterClient.Publish]].
*
* Use the factory method [[ClusterClient#props]]) to create the
* [[akka.actor.Props]] for the actor.
*
* If the receptionist is not currently available, the client will buffer the messages
* and then deliver them when the connection to the receptionist has been established.
* The size of the buffer is configurable and it can be disabled by using a buffer size
* of 0. When the buffer is full old messages will be dropped when new messages are sent
* via the client.
*
* Note that this is a best effort implementation: messages can always be lost due to the distributed
* nature of the actors involved.
*/
final class ClusterClient(settings: ClusterClientSettings) extends Actor with ActorLogging
通过ClusterClient的定义和官方注释来看,就是一个普通的actor,它可以集群中的特定actor(ClusterReceptionist)进行通信。它通过初始的联系点(其实就是ActorPath)与集群内的ClusterReceptionist发消息,同时会监控receptionist的链接状态,以确保链接正常。ClusterClient没有重定义preStart,那就看它的主构造函数吧。
sendGetContacts()
scheduleRefreshContactsTick(establishingGetContactsInterval)
self ! RefreshContactsTick
分别调用了上面三段代码。
def sendGetContacts(): Unit = {
val sendTo =
if (contacts.isEmpty) initialContactsSel
else if (contacts.size == 1) initialContactsSel union contacts
else contacts
if (log.isDebugEnabled)
log.debug(s"""Sending GetContacts to [${sendTo.mkString(",")}]""")
sendTo.foreach { _ ! GetContacts }
}
sendGetContacts很简单就是给当前的联系点发送GetContacts消息。
def scheduleRefreshContactsTick(interval: FiniteDuration): Unit = {
refreshContactsTask foreach { _.cancel() }
refreshContactsTask = Some(context.system.scheduler.schedule(
interval, interval, self, RefreshContactsTick))
}
scheduleRefreshContactsTick启动定时器在interval之后,每隔interval时间,给自己发送RefreshContactsTick消息。
第三段给自己发送了RefreshContactsTick消息。感觉后面两个代码有点重复,定时器第一个参数直接设置成0不就好了?省略了第三段代码的调用。
case RefreshContactsTick ⇒ sendGetContacts()
收到RefreshContactsTick消息怎么处理?还是调用sendGetContacts。那请问在主构造函数里面调用sendGetContacts干啥呢?
var contactPaths: HashSet[ActorPath] =
initialContacts.to[HashSet]
val initialContactsSel =
contactPaths.map(context.actorSelection)
var contacts = initialContactsSel
initialContactsSel、contacts、contactPaths、initialContacts是不是很相似呢?
其中initialContactsSel最关键,这是把initialContacts给map成了ActorSelection,同时还给initialContacts发送了Identity消息。ActorPath是远程的actor,怎么select呢?还记得上文说过么?必须把provider配置成remote或者cluster,为啥?你猜。
case ActorIdentity(_, Some(receptionist)) ⇒
log.info("Connected to [{}]", receptionist.path)
scheduleRefreshContactsTick(refreshContactsInterval)
sendBuffered(receptionist)
context.become(active(receptionist) orElse contactPointMessages)
connectTimerCancelable.foreach(_.cancel())
failureDetector.heartbeat()
self ! HeartbeatTick // will register us as active client of the selected receptionist
收到ActorIdentity之后调用scheduleRefreshContactsTick重新设置定时器,把缓存的消息发送给receptionist ,修改当前行为变成active。至此就可以通过Send、SendToAll、Publish给集群内特定的actor转发消息了。
def active(receptionist: ActorRef): Actor.Receive = {
case Send(path, msg, localAffinity) ⇒
receptionist forward DistributedPubSubMediator.Send(path, msg, localAffinity)
case SendToAll(path, msg) ⇒
receptionist forward DistributedPubSubMediator.SendToAll(path, msg)
case Publish(topic, msg) ⇒
receptionist forward DistributedPubSubMediator.Publish(topic, msg)
case HeartbeatTick ⇒
if (!failureDetector.isAvailable) {
log.info("Lost contact with [{}], reestablishing connection", receptionist)
reestablish()
} else
receptionist ! Heartbeat
case HeartbeatRsp ⇒
failureDetector.heartbeat()
case RefreshContactsTick ⇒
receptionist ! GetContacts
case Contacts(contactPoints) ⇒
// refresh of contacts
if (contactPoints.nonEmpty) {
contactPaths = contactPoints.map(ActorPath.fromString).to[HashSet]
contacts = contactPaths.map(context.actorSelection)
}
publishContactPoints()
case _: ActorIdentity ⇒ // ok, from previous establish, already handled
case ReceptionistShutdown ⇒
if (receptionist == sender()) {
log.info("Receptionist [{}] is shutting down, reestablishing connection", receptionist)
reestablish()
}
}
总结下ClusterClient的行为,它通过配置的initialContacts给远程的actor(集群内的ClusterReceptionist)发送ActorSelection消息,然后在收到第一个ActorIdentity消息后,就算联系上了集群。(剩下的ActorIdentity消息被忽略,其实就是最快返回的作为联系点)。定时第一个返回ActorIdentity消息的ClusterReceptionist发送GetContacts消息,获取所有的ClusterReceptionist实例的位置。那如何判断第一个联系点失去联系了呢?看到HeartbeatTick了吗?
val heartbeatTask = context.system.scheduler.schedule(
heartbeatInterval, heartbeatInterval, self, HeartbeatTick)
我们刚才忽略了heartbeatTask的定义,其实这是一个定时器,每隔heartbeatInterval秒给自己发送HeartbeatTick消息。其实关于在变量定义过程中写代码,我是不喜欢的,不利于分析源码的啊。
收到HeartbeatTick消息就给receptionist发送了Heartbeat消息,在收到HeartbeatRsp后更新failureDetector当前的心跳信息。如果failureDetector检测到失败则调用reestablish方法,重新建立链接。
ClusterClient的源码就分析到这里,下面我们来看看Cluster内的ClusterReceptionist的实现,之前说过,我们可以用actorOf启动或者ClusterReceptionist扩展来启动。当然优先看ClusterReceptionist扩展了啊。
object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider {
override def get(system: ActorSystem): ClusterClientReceptionist = super.get(system)
override def lookup() = ClusterClientReceptionist
override def createExtension(system: ExtendedActorSystem): ClusterClientReceptionist =
new ClusterClientReceptionist(system)
}
上面是ExtensionId的定义,很显然它还扩展了ExtensionIdProvider,也就是说,通过配置这个Extension就可以启动了,无需代码显式的启动。
/**
* Extension that starts [[ClusterReceptionist]] and accompanying [[akka.cluster.pubsub.DistributedPubSubMediator]]
* with settings defined in config section `akka.cluster.client.receptionist`.
* The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSub]] extension.
*/
final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension
有没有发现关于重要的类,官方注释都很清晰?这个扩展启动ClusterReceptionist和DistributedPubSubMediator,而DistributedPubSubMediator由DistributedPubSub扩展启动,关于DistributedPubSub后面再分析。
/**
* The [[ClusterReceptionist]] actor
*/
private val receptionist: ActorRef = {
if (isTerminated)
system.deadLetters
else {
val name = config.getString("name")
val dispatcher = config.getString("use-dispatcher") match {
case "" ⇒ Dispatchers.DefaultDispatcherId
case id ⇒ id
}
// important to use val mediator here to activate it outside of ClusterReceptionist constructor
val mediator = pubSubMediator
system.systemActorOf(ClusterReceptionist.props(mediator, ClusterReceptionistSettings(config))
.withDispatcher(dispatcher), name)
}
}
/**
* Returns the underlying receptionist actor, particularly so that its
* events can be observed via subscribe/unsubscribe.
*/
def underlying: ActorRef =
receptionist
/**
* Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]].
*/
private def pubSubMediator: ActorRef = DistributedPubSub(system).mediator
ClusterClientReceptionist定义中有上面源码,非常关键,它启动了一个ClusterReceptionist,其他源码都是注册和注销服务的,我们先忽略。
/**
* [[ClusterClient]] connects to this actor to retrieve. The `ClusterReceptionist` is
* supposed to be started on all nodes, or all nodes with specified role, in the cluster.
* The receptionist can be started with the [[ClusterClientReceptionist]] or as an
* ordinary actor (use the factory method [[ClusterReceptionist#props]]).
*
* The receptionist forwards messages from the client to the associated [[akka.cluster.pubsub.DistributedPubSubMediator]],
* i.e. the client can send messages to any actor in the cluster that is registered in the
* `DistributedPubSubMediator`. Messages from the client are wrapped in
* [[akka.cluster.pubsub.DistributedPubSubMediator.Send]], [[akka.cluster.pubsub.DistributedPubSubMediator.SendToAll]]
* or [[akka.cluster.pubsub.DistributedPubSubMediator.Publish]] with the semantics described in
* [[akka.cluster.pubsub.DistributedPubSubMediator]].
*
* Response messages from the destination actor are tunneled via the receptionist
* to avoid inbound connections from other cluster nodes to the client, i.e.
* the `sender()`, as seen by the destination actor, is not the client itself.
* The `sender()` of the response messages, as seen by the client, is `deadLetters`
* since the client should normally send subsequent messages via the `ClusterClient`.
* It is possible to pass the original sender inside the reply messages if
* the client is supposed to communicate directly to the actor in the cluster.
*
*/
final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings)
extends Actor with ActorLogging
加上我们之前的分析和官方注释,这个actor就很好理解了。ClusterClient就是发送GetContracts消息给这个actor的,ClusterReceptionist在集群内所有节点或一组节点启动。它可以通过ClusterClientReceptionist这个扩展启动,或者作为普通的actor启动(actorOf)。ClusterReceptionist把ClusterClient转发的消息再吃给你信转发给DistributedPubSubMediator或注册的DistributedPubSubMediator(也就是我们注册的Service)。目标actor返回的消息通过DistributedPubSubMediator打的“洞”返回给客户端,其实就是修改了sender。
这个定义也可以看出,它就是一个非常普通的actor。从源码来看,主构造函数和preStart函数都没有需要特别注意的地方,那就直接看receive喽。
case GetContacts ⇒
// Consistent hashing is used to ensure that the reply to GetContacts
// is the same from all nodes (most of the time) and it also
// load balances the client connections among the nodes in the cluster.
if (numberOfContacts >= nodes.size) {
val contacts = Contacts(nodes.map(a ⇒ self.path.toStringWithAddress(a))(collection.breakOut))
if (log.isDebugEnabled)
log.debug("Client [{}] gets contactPoints [{}] (all nodes)", sender().path, contacts.contactPoints.mkString(","))
sender() ! contacts
} else {
// using toStringWithAddress in case the client is local, normally it is not, and
// toStringWithAddress will use the remote address of the client
val a = consistentHash.nodeFor(sender().path.toStringWithAddress(cluster.selfAddress))
val slice = {
val first = nodes.from(a).tail.take(numberOfContacts)
if (first.size == numberOfContacts) first
else first union nodes.take(numberOfContacts - first.size)
}
val contacts = Contacts(slice.map(a ⇒ self.path.toStringWithAddress(a))(collection.breakOut))
if (log.isDebugEnabled)
log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(","))
sender() ! contacts
}
对GetContacts消息的处理我们需要特别关注,毕竟ClusterClient就是发送这个消息来获取集群内service信息的。第一个if语句的注释也很明白,有一个一致性hash来保证所有节点对GetContacts消息的返回都是一致的。
case msg @ (_: Send | _: SendToAll | _: Publish) ⇒
val tunnel = responseTunnel(sender())
tunnel ! Ping // keep alive
pubSubMediator.tell(msg, tunnel)
上面就是收到Send、SendToAll、Publish消息的处理逻辑。好像就是把消息发送给了pubSubMediator,这里出现了前面注释中说的“打洞”
def responseTunnel(client: ActorRef): ActorRef = {
val encName = URLEncoder.encode(client.path.toSerializationFormat, "utf-8")
context.child(encName) match {
case Some(tunnel) ⇒ tunnel
case None ⇒
context.actorOf(Props(classOf[ClientResponseTunnel], client, responseTunnelReceiveTimeout), encName)
}
}
它在干啥,又创建了一个ClientResponseTunnel这个actor?把这个actor作为service消息的返回者?然后还有一个responseTunnelReceiveTimeout超时时间?
/**
* Replies are tunneled via this actor, child of the receptionist, to avoid
* inbound connections from other cluster nodes to the client.
*/
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging {
context.setReceiveTimeout(timeout)
private val isAsk = {
val pathElements = client.path.elements
pathElements.size == 2 && pathElements.head == "temp" && pathElements.tail.head.startsWith("$")
}
def receive = {
case Ping ⇒ // keep alive from client
case ReceiveTimeout ⇒
log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path)
context stop self
case msg ⇒
client.tell(msg, Actor.noSender)
if (isAsk)
context stop self
}
}
这个actor功能很简单,就是给client转发消息,这尼玛有点太绕了啊。在本地给各个client有创建了一个代理actor啊,返回的消息都通过这个actor返回啊,为啥不直接在服务端就把消息发送给client了呢?其实想想这是非常合理且必要的。有可能service所在的节点,与客户端网络是不通的。或者为了安全管理不能直接通信,通过这个代理回送消息就很必要了。不管怎么样吧,akka的都是对的,akka的都是好的。
case Heartbeat ⇒
if (verboseHeartbeat) log.debug("Heartbeat from client [{}]", sender().path)
sender() ! HeartbeatRsp
updateClientInteractions(sender())
还有就是对客户端发送的Heartbeat消息的处理,处理逻辑很简单,但有一点需要注意,那就是对客户端列表的一个维护。也就是说在每个ClusterReceptionist都是有客户端列表的。其实吧,这一点我是非常不赞同的。毕竟客户端有可能是海量的,光是维护这个列表就非常耗内存了。弄这个列表虽然功能上非常丰富,但容易造成OOM啊。如果客户端不多,说明akka还没有正式被大家所熟知或者被大公司使用啊。
好了,ClusterClient就分析到这里了。聪明的读者可能会问,我还没有看到消息是如何通过ClusterReceptionist发送给实际的服务actor啊,pubSubMediator.tell(msg, tunnel)这段代码是如何路由消息的呢?嗯,确实,不过别急,这个会在下一章节(DistributedPubSubMediator)讲解。毕竟官方在ClusterClient的文档中,直接推荐用DistributedPubSubMediator来实现类似的功能。我觉得吧,这又是一个坑,既然你都推荐DistributedPubSubMediator了,还提供ClusterClient模块干啥呢?直接废弃掉啊。
Android 工具 HierarchyViewer 代码导读 (2) -- 建立 Eclipse 调试环境
在上文 <Android 工具 HierarchyViewer 代码导读 (1) -- 功能实现演示 > 中,我们介绍了 HierarchyViewer 主要技术点的实现。虽然我们还没有涉及到 HierarchyViewer 的源代码,但是利用上节所讲到的知识,读者甚至已经可以实现一个自己的 HierarchyViewer 了。
本文的内容比较轻松,我们将介绍如何把 Android 源代码中的 HierarchyViewer 项目和依赖项目导入 Eclipse 中,通过 Eclipse 阅读和调试将提高我们理解的效率,所谓磨刀不误砍柴工。
如果你没有安装 Eclipse,可以在 Eclipse 官网下载 Eclipse IDE for Java Developers。本文的讲解基于 Android4.0 ICS,关于源代码的下载与编译,网络上已经有很多资料,我们这里不再多做介绍,不过由于主站由于某些原因很难同步成功,建议大家从镜像服务器 codeaurora.org 下载,可以参考 < 更换 codeaurora.org 的 repo 源解决同步缓慢问题 > 一文。
1,导入 HierarchyViewer 和 HierarchyViewerlib
打开 Eclipse,打开 File-> Import –> Existing Projects into Workspace,点击 Next
选择从~/Android-Source/sdk/hierarchyviewer2/app 中导入 hierarchyviewer 项目。(作者的 Android 源代码地址为~/Android-Source)
重复上面的步骤,从~/Android-Source/sdk/hierarchyviewer2/libs/hierarchyviewerlib 导入 hierarchyviewerlib 项目。
2, 导入 ddmlib 和 ddmuilib 项目
ddmlib 和 ddmuilib 是许多 Android SDK 工具共同依赖的包,你可以选择不导入这两个项目而直接引入 jar 文件,如果你已经编译了 Android 源代码,你可以在~/Android-Source/out/host/liunx-x86/framwork/ 目录下找到 ddmlib.jar 和 ddmuilib.jar,或者从 Android SDK 中的 \tools\lib 目录下找到他们。
ddmlib 包含了 adb 的 api,如果你对 adb 的初始化和通信感兴趣,最好导入这两个工程,从以下目录导入:
~/Android-Source/sdk/ddms/libs/ddmlib
~/Android-Source/sdk/ddms/libs/ddmuilib
导入后,可能无法编译它们,这是由于源代码中的重载函数都没有加上 @Override 声明,而 eclipse 默认把这个当作 error 来处理。我们需要修改一下项目的设置:
打开 ddmlib 和 ddmuilib 的工程属性对话框,选择 Java compiler->Error/Warnings,在 Annotations 节点下,把 “Missing’@Override’ annotation” 的错误级别从 “Error” 改为 “Warning” 或者 “Ignore”
3, 添加 jar 文件引用
最后,为项目添加通用的 jar 文件引用,这些 jar 文件都可以在~/Android-Source/out/host/liunx-x86/framwork/ 或者 Android-SDK\tools\lib 目录下找到:
ddmulib 需要添加的引用:
HierarchyViewerlib 项目需要添加的引用:
HierarchyViewer 项目需要添加的引用:
特别需要注意的是,swt.jar 在 Android-SDK\tools\lib 下的 x86 和 x86_64 目录下有 2 个版本,必须根据你机器的 jre 是 32 位还是 64 位的,来选择正确的版本,否则的话虽然编译能通过却无法运行。
4,调试启动
这时,所有的项目都一个编译通过了,调试启动 HierarchyViewer,选择入口点 com.android.hirarchyviewer 启动:
5,在线阅读网址
最后,介绍一个在线阅读 Android 源代码的地址 http://androidxref.com/,网站提供了非常方便的搜索、变量引用和类型定义导航功能。虽然无法调试,但也是一个不错的选择。
本文由知平软件的刘斌华原创,转载请注明出处。
知平软件致力于移动平台自动化测试技术的研究,我们希望通过向社区贡献知识和开源项目,来促进行业和自身的发展。
今天关于在scipy.cluster.hierarchy.linkage和中使用距离矩阵?的分享就到这里,希望大家有所收获,若想了解更多关于akka cluster集群-akka cluster bootstrap-Local using config、Akka-Cluster(3)- ClusterClient, 集群客户端、Akka源码分析-Cluster-ClusterClient、Android 工具 HierarchyViewer 代码导读 (2) -- 建立 Eclipse 调试环境等相关知识,可以在本站进行查询。
本文标签: