Akka Extension(扩展)

Akka Extension是Akka提供的一套可插拔的、用来增强Akka能力的机制,akka-cluster等很多内建功能也是基于它实现的。同时,Akka Extension还提供了某种程度上的依赖管理功能,Fusion也基于它实现了**akka-fusion**框架的模块化管理。

Akka Extension提供了两个基本组件:ExtensionExtensionId。每个Akka扩展在同一个ActorSystem内保证只加载一次,你可以选择按需加载,也可以选择随ActorSystem创建时即加载。有关这部分的内容参考接下来的 从配置加载

这样,在你的应用中只需要全局保证一个ActorSystem即可,其它的服务、资源都可以通过 Akka Extension 来维护。同时,你可以很自然的在自己的Akka Extension实例内部引用其它的Akka Extension(需要保证它们都使用同一个ActorSystem)。 这可能是在不使用IOC(如Spring、Guice等)情况下最好的进行依赖管理机制之一。

从配置加载

akka {
  # 用于为Akka制作的第三方扩展库,需要随ActorSystem一起加载。
  # 若最终用户需要在`application.conf`中配置扩展,应使用`extensions`属性配置。
  library-extensions = ${?akka.library-extensions} ["akka.serialization.SerializationExtension"]

  # 在此列出需要加载的多个自定义扩展,需要使用类的全限定名。
  extensions = []
}

构建扩展

akka-fusion在提供了 FusionExtension 帮助trait来构建Akka Extension。

trait FusionExtension extends Extension {
  val classicSystem: ExtendedActorSystem

  val configuration: Configuration = Configuration(classicSystem.settings.config)
  def typedSystem: ActorSystem[Nothing] = classicSystem.toTyped
}

FusionExtension在默认Extension基础之上添加了ActorSystem[T]akka.actor.ActorSystem引用,提供了Configuration(对Lightbend Config的增强包装)。

通过Akka Extension来管理资源

接下来为FusionJdbc作为示例,说明FusionExtension是怎样来管理我们的数据库访问资源的。FusionJdbc管理了一个或多个数据库连接池,连接池通过 HikariCP 实现。

class FusionJdbc private (override val classicSystem: ExtendedActorSystem) extends FusionExtension {
  val components = new JdbcComponents(classicSystem)
  FusionCore(classicSystem).shutdowns.beforeActorSystemTerminate("StopFusionJdbc") { () =>
    components.closeAsync()(classicSystem.dispatcher)
  }
  def component: HikariDataSource = components.component
}

object FusionJdbc extends FusionExtensionId[FusionJdbc] {
  override def createExtension(system: ExtendedActorSystem): FusionJdbc = new FusionJdbc(system)
}

FusionJdbc将由Akka保证在同一个ActorSystem中只被实例化一次,就像Spring框架里的@Service注解、Guice框架的Singleton注解一样,它们都是 单例

final private[jdbc] class JdbcComponents(system: ExtendedActorSystem)
    extends Components[HikariDataSource](JdbcConstants.PATH_DEFAULT) {
  override def configuration: Configuration = Configuration(system.settings.config)

  override protected def componentClose(c: HikariDataSource): Future[Done] = Future.successful {
    c.close()
    Done
  }

  override protected def createComponent(id: String): HikariDataSource =
    JdbcUtils.createHikariDataSource(configuration.getConfig(id))
}

JdbcComponents继承了ComponentsComponents提供了一个保存同一类型组件的多个实例的优秀方案。它基于 Lightbend Config 实现了可配置化,通过构造函数传入的配置路径(id)来决定引用哪一个配置,并保存id的实例的对应关系。

请关注FusionCore(system).shutdowns.beforeActorSystemTerminate这行代码,它使用CoordinatedShutdown来协调资源的关闭,它将在ActorSystem终止前关闭所有数据库连接池。更多内容请参阅: FusionCore#shutdowns

Components

Components 提供代码实现如下:

abstract class Components[T](DEFAULT_ID: String) extends StrictLogging {
  protected val components = mutable.Map.empty[String, T]

  def configuration: Configuration

  protected def createComponent(id: String): T
  protected def componentClose(c: T): Future[Done]

  def component: T = lookup(DEFAULT_ID)

  final def lookup(id: String): T = synchronized(lookupComponent(id))

  protected def lookupComponent(id: String): T = components.getOrElseUpdate(id, createComponent(id))

  final def register(id: String, other: T, replaceExists: Boolean = false): T =
    synchronized(registerComponent(id, other, replaceExists))

  protected def registerComponent(id: String, other: T, replaceExists: Boolean): T = {
    require(id != DEFAULT_ID, s"id不能为默认配置ID,$id == $DEFAULT_ID")
    val isReplace = configuration.getOrElse(id + ".replace-exists", replaceExists)
    components.get(id).foreach {
      case c if isReplace =>
        try {
          Await.ready(componentClose(c), 30.seconds)
        } catch {
          case e: Throwable =>
            logger.error(s"registerComponent replace exists component 30s timeout error: ${e.toString};id: $id", e)
        }
        components.remove(id)
      case _ =>
        throw new IllegalAccessException(s"id重复,$id")
    }
    components.put(id, other)
    other
  }

  def closeAsync()(implicit ec: ExecutionContext): Future[Done] = synchronized {
    Future.sequence(components.valuesIterator.map(componentClose).toList).map(_ => Done)
  }
}

通过Akka Extension来管理服务(依赖)

修定你有3个服务:

  • FileService:统一的文件服务,如提供用户头像链接
  • UserService:用户服务
  • LoginService:实现用户登录、注册等业务逻辑

你可以如下定义3个服务

/*
 * Copyright 2019 helloscala.com
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package docs.extension.customservice

import akka.actor.typed.ActorSystem
import fusion.common.extension.{ TypedExtension, TypedExtensionId }
import helloscala.common.exception.HSUnauthorizedException
import helloscala.common.util.{ DigestUtils, StringUtils }

import scala.concurrent.Future

case class LoginDTO(account: String, password: String)
case class LoginBO(id: String, nickname: String)
case class UserBO(id: String, nickname: String, avatarId: String, avatarUrl: String)
case class UserDO(id: String, nickname: String, avatarId: String, password: String, salt: String)

class UserRepository {
  def findByAccount(account: String): Future[UserDO] =
    Future.successful(UserDO(StringUtils.randomString(24), account, StringUtils.randomString(24), "password", "salt"))

  def findById(id: String): Future[UserDO] =
    Future.successful(UserDO(id, "用户", StringUtils.randomString(24), "password", "salt"))
}

class FileService private (override val typedSystem: ActorSystem[Nothing]) extends TypedExtension {
  def findUrlById(fileId: String): Future[String] = Future.successful {
    s"http://localhost:9999/file/$fileId.png"
  }
}

object FileService extends TypedExtensionId[FileService] {
  override def createExtension(system: ActorSystem[_]): FileService = new FileService(system)
}

class UserService private (override val typedSystem: ActorSystem[Nothing]) extends TypedExtension {
  import typedSystem.executionContext
  private val fileService = FileService(typedSystem)
  private val userRepository = new UserRepository()

  def findBOById(id: String): Future[UserBO] = {
    userRepository.findById(id).flatMap { user =>
      fileService.findUrlById(user.avatarId).map { url =>
        UserBO(user.id, user.nickname, user.avatarId, url)
      }
    }
  }

  def findByAccount(account: String): Future[UserDO] = {
    userRepository.findByAccount(account)
  }
}

object UserService extends TypedExtensionId[UserService] {
  override def createExtension(system: ActorSystem[_]): UserService = new UserService(system)
}

class LoginService private (override val typedSystem: ActorSystem[Nothing]) extends TypedExtension {
  import typedSystem.executionContext
  private val userService = UserService(typedSystem)

  def login(dto: LoginDTO): Future[LoginBO] = {
    userService.findByAccount(dto.account).map {
      case user if user.password == DigestUtils.sha256Hex(dto.password + user.salt) =>
        LoginBO(user.id, user.nickname)
      case _ =>
        throw HSUnauthorizedException("密码不匹配")
    }
  }
}

object LoginService extends TypedExtensionId[LoginService] {
  override def createExtension(system: ActorSystem[_]): LoginService = new LoginService(system)
}

通过以上代码,你看到了怎样使用Akka Extension来实现服务的依赖管理。所有的服务之间只有一个显示依赖:ActorSystem。因为我们的框架是基于Akka的,所以我们认为显示依赖ActorSystem并不是一个问题。