学习视频来源:DDD独家秘籍视频合集 https://space.bilibili.com/24690212/channel/collectiondetail?sid=1940048&ctype=0
源代码地址:https://github.com/ByteBlizzard
本篇文章是讲视频中的3期内容合并为一起。
文章目录
- 需求
- 模型
- 命令
- 聚合
- 领域事件
- 查询
- 领域外功能
- 源码
- 1.项目结构
- 2.领域模块
- 命令
- 聚合对象
- 领域事件
- 领域事件队列定义
- 领域事件监听器定义
- 仓储定义
- 报警策略监听器
- 报警策略定义
- 3.领域外模块
- 接口
- 命令代理类
- 领域事件分发器
- 领域事件队列实现
- 领域事件监听器实现
- 仓储接口实现
- 报警策略实现
- 查询
需求
做一个停车计费程序,具体需求如下:
- 车牌识别系统会在车辆入场和出场的时候调用计费程序
- 付费后15分钟内可以离场,超过15分钟要补费
- 车辆入场出场失败的时候,要给管理员报警
- 计费程序提供查询当前某个车牌号应付款
- 支付系统通知计费程序车辆已付款
- 用户要能够查看某个车牌号过去的停车记录
- 管理员要查看在场车辆总数,每日营业额
- 计费规则是每小时1块,不足一小时当成1小时
- 黑名单限制
模型
命令
车辆入场、车辆出场、计算费用、付费、添加车辆到黑名单、从黑名单移除车辆。
聚合
聚合对象:停车
领域事件
车辆已出场、车辆已退场、车辆已付费、出场失败、入场失败。可以看到领域事件都是已发生确定的有意义事件 。
查询
在场数量、停车记、每日收入。
领域外功能
报警策略。和仓储一样,属于外的东西,只在领域内定义和使用接口,但是具体实现是在领域外。
源码
1.项目结构
项目分为领域模块和领域外模块,领域外模块依赖领域模块。
领域模块是我们的核心功能,包含命令、处理命令的handler、聚合、领域事件、存储和发送领域事件队列的定义、领域事件监听器的定义、仓储相关定义、报警策略定义。
领域外模块包括接口、仓储、报警策略实现、仓储相关接口的实现、发送领域事件队列的实现、领域事件监听器实现。
2.领域模块
命令
定义一个命令和处理这个命令的处理器。命令包括车牌号和入场时间。处理器通过车牌号查询聚合对象(parking),调用聚合的处理入场命令的方法,并保存到数据库。
class CheckInCommand(val plate: Plate,val checkInTime: LocalDateTime
)@Component
class CheckInCommandHandler(private val parkingRepository: ParkingRepository
) {fun handle(eventQueue: EventQueue, command: CheckInCommand): Boolean {val parking = parkingRepository.findByIdOrError(command.plate)val result = parking.handle(eventQueue, command)parkingRepository.save(parking)return result}
}
聚合对象
聚合对象包含项目的核心功能。根据业务逻辑,处理各种命令等等,具体是在handle方法内处理,
handle方法的入参是一个队列,处理命令过程中产生的领域事件,就是放到这里。
interface Parking {fun handle(eventQueue: EventQueue, command: CheckInCommand): Booleanfun calculateFeeNow(now: LocalDateTime): Intfun handle(eventQueue: EventQueue, command: NotifyPayCommand)fun handle(eventQueue: EventQueue, command: CheckOutCommand): Boolean
}const val FEE_PER_HOUR = 1;class ParkingImpl(val id: Plate,var checkInTime: LocalDateTime? = null,var lastPlayTime: LocalDateTime? = null,var totalPaid: Int = 0
): Parking {override fun handle(eventQueue: EventQueue, command: CheckInCommand): Boolean {if (inPark()) {eventQueue.enqueue(CheckInFailedEvent(id, command.checkInTime))return false}eventQueue.enqueue(CheckedInEvent(id, command.checkInTime))this.checkInTime = command.checkInTimereturn true}override fun handle(eventQueue: EventQueue, command: NotifyPayCommand) {if (!inPark()) {throw DomainException("车辆不在场,不能付费")}lastPlayTime = command.payTimetotalPaid += command.amounteventQueue.enqueue(PaidEvent(plate = id, amount = command.amount, payTime = command.payTime))}override fun handle(eventQueue: EventQueue, command: CheckOutCommand): Boolean {if (!inPark()) {eventQueue.enqueue(CheckOutFailedEvent(plate = id, time = command.time, message = "车辆不在场"))return false}if (calculateFeeNow(command.time) > 0) {return false}this.checkInTime = nullthis.totalPaid = 0this.lastPlayTime= nulleventQueue.enqueue(CheckedOutEvent(plate = id, time = command.time))return true}override fun calculateFeeNow(now: LocalDateTime): Int {val currentCheckInTime = checkInTime ?: throw DomainException("车辆尚未入场")val lastPayTimeCurrent = lastPlayTime ?: return feeBetween(currentCheckInTime, now)if (totalPaid < feeBetween(currentCheckInTime, lastPayTimeCurrent)) {return feeBetween(currentCheckInTime, now) - totalPaid}if (lastPayTimeCurrent.plusMinutes(15).isAfter(now)) {return 0}return feeBetween(currentCheckInTime, now) - totalPaid}private fun feeBetween(start: LocalDateTime, end: LocalDateTime): Int {return hoursBetween(start, end) * FEE_PER_HOUR}private fun hoursBetween(start: LocalDateTime, end: LocalDateTime): Int {val minutes = Duration.between(start, end).toMinutes()val hours = minutes / 60return (if (hours * 60 == minutes) hours else hours + 1).toInt()}fun inPark(): Boolean {return checkInTime != null}
}
领域事件
class CheckedInEvent(val plate: Plate,val time: LocalDateTime
): DomainEvent
领域事件队列定义
在领域模块仅仅是定义,实现在领域外。
interface EventQueue {fun enqueue(event: DomainEvent)fun queue(): List<DomainEvent>
}
领域事件监听器定义
我觉得这个也可以定义到领域外模块主要是报警策略的定义用到了监听器,所以只能放领域内了。
interface DomainEventListener {fun onEvent(event: DomainEvent)
}
仓储定义
仓储在领域模块内仅仅是定义,实现在领域外模块。
interface ParkingRepository {fun findByIdOrError(plate: Plate): Parkingfun save(parking: Parking)
}
报警策略监听器
class AlarmPolicy(private val alarmService: AlarmService
) : DomainEventListener{override fun onEvent(event: DomainEvent) {if (event is CheckInFailedEvent) {alarmService.alarm(event.plate, "入场失败")return}if (event is CheckOutFailedEvent) {alarmService.alarm(event.plate, event.message)}}
}
报警策略定义
interface AlarmService {fun alarm(plate: Plate, message: String)
}
3.领域外模块
接口
将请求转成命令,CheckInReq->CheckInCommand,通过一个代理类,调用事件handler处理命令。
@Controller
class CheckInController(private val checkInCommandHandler: CheckInCommandHandler,private val commandInvoker: CommandInvoker
) {@MutationMappingfun checkIn(@Argument("req") req: CheckInReq): Boolean {return commandInvoker.invoke {checkInCommandHandler.handle(it,CheckInCommand(plate = Plate(req.plate),checkInTime = LocalDateTime.parse(req.time)))}}
}class CheckInReq(val plate: String,val time: String
)
命令代理类
使用代理类主要是为了简化代码。因为每个命令都是调相关的handler,把处理命令过程中的事件存储到事件eventQueue 队列中,最后调用时间分发器domainEventDispatcher把队列中的事件分发出去。每个命令的处理流程都是这几个步骤,且这几个步骤在一个事务之中,所以这里用了代理类。
interface CommandInvoker {fun <R> invoke(run: (EventQueue) -> R): R
}@Component
class OneTransactionCommandInvoker(transactionManager: PlatformTransactionManager,private val domainEventDispatcher: DomainEventDispatcher
) : CommandInvoker {private val transactionTemplate: TransactionTemplate = TransactionTemplate(transactionManager)override fun <R> invoke(run: (EventQueue) -> R): R {return transactionTemplate.execute { s ->val eventQueue = SimpleEventQueue()val result = run(eventQueue)this.domainEventDispatcher.dispatchNow(eventQueue)return@execute result} ?: throw IllegalStateException()}}
领域事件分发器
队列中的每一个事件,分发给每一个事件监听器。监听器会接收每一个事件,但只处理自己关心的事件,不关心的不处理。
@Component
class SyncDomainEventDispatcher(private val listeners: List<DomainEventListener>
) : DomainEventDispatcher {override fun dispatchNow(eventQueue: EventQueue) {eventQueue.queue().forEach { event ->listeners.forEach { listener ->listener.onEvent(event)}}}
}
领域事件队列实现
用一个LinkedList模拟消息队列,比较简单。
class SimpleEventQueue: EventQueue {private val queue: MutableList<DomainEvent> = LinkedList()override fun enqueue(event: DomainEvent) {queue.add(event)}override fun queue(): List<DomainEvent> {return queue}
}
领域事件监听器实现
监听不同的事件,并生产查询记录。用于CQRS架构查询模型查询。只处理自己关心的事件,不关心的不处理。
@Component
class ParkingHistoryMaterializer(private val parkingViewDao: ParkingViewDao
): DomainEventListener {override fun onEvent(event: DomainEvent) {when (event) {is CheckedInEvent -> insertHistory(event)is CheckedOutEvent -> updateOnCheckOut(event)is PaidEvent -> updateOnPaid(event)}}private fun updateOnPaid(event: PaidEvent) {val parkingViewRecord = parkingViewDao.findTopByPlateOrderByIdDesc(event.plate.value)parkingViewRecord.payAmount += event.amountparkingViewDao.save(parkingViewRecord)}private fun updateOnCheckOut(event: CheckedOutEvent) {val parkingViewRecord = parkingViewDao.findTopByPlateOrderByIdDesc(event.plate.value)parkingViewRecord.checkOutTime = event.timeparkingViewDao.save(parkingViewRecord)}private fun insertHistory(event: CheckedInEvent) {parkingViewDao.save(ParkingViewTable(id = null,plate = event.plate.value,checkInTime = event.time,checkOutTime = null,payAmount = 0))}
}
仓储接口实现
实现业务需要与数据库的交互的接口。
class ParkingRepositoryImpl(private val parkingDao: ParkingDao
): ParkingRepository {override fun findByIdOrError(plate: Plate): Parking {return parkingDao.findById(plate.value).map {ParkingImpl(id = plate,checkInTime = it.checkInTime,lastPlayTime = it.lastPlayTime,totalPaid = it.totalPaid)}.orElse(ParkingImpl(id = plate,checkInTime = null))}override fun save(parking: Parking) {if (parking !is ParkingImpl) {throw UnsupportedOperationException("不支持的类型: ${parking.javaClass}")}parkingDao.save(ParkingTable(id = parking.id.value,checkInTime = parking.checkInTime,lastPlayTime = parking.lastPlayTime,totalPaid = parking.totalPaid))}
}
报警策略实现
属于领域之外的模块,其定义AlarmService也是在领域模块内的。
@Component
class AlarmServiceDBImpl(private val alarmDao: AlarmDao
) : AlarmService {override fun alarm(plate: Plate, message: String) {alarmDao.save(AlarmTable(id = null,plate = plate.value,msg = message,time = LocalDateTime.now()))}
}
查询
项目用到了CQRS同事务存储分离的架构方式。这里就是查询功能,只是对数据库做一些统计,比较简单。
@Component
class ParkingHistoryMaterializer(private val parkingViewDao: ParkingViewDao
): DomainEventListener {override fun onEvent(event: DomainEvent) {when (event) {is CheckedInEvent -> insertHistory(event)is CheckedOutEvent -> updateOnCheckOut(event)is PaidEvent -> updateOnPaid(event)}}private fun updateOnPaid(event: PaidEvent) {val parkingViewRecord = parkingViewDao.findTopByPlateOrderByIdDesc(event.plate.value)parkingViewRecord.payAmount += event.amountparkingViewDao.save(parkingViewRecord)}private fun updateOnCheckOut(event: CheckedOutEvent) {val parkingViewRecord = parkingViewDao.findTopByPlateOrderByIdDesc(event.plate.value)parkingViewRecord.checkOutTime = event.timeparkingViewDao.save(parkingViewRecord)}private fun insertHistory(event: CheckedInEvent) {parkingViewDao.save(ParkingViewTable(id = null,plate = event.plate.value,checkInTime = event.time,checkOutTime = null,payAmount = 0))}
}