ABP目前已经是很成熟的开发框架了,它提供了很多我们日常开发所必须的功能,并且很方便扩展,让我们能更专注于业务的开发。但是ABP官方并没有给我们实现工作流。
在.net core环境下的开源工作流引擎很少,其中WorkflowCore是一款轻量级工作流引擎,对于小型工作流和责任链类型的需求开发很适合,但只能通过后台编码或者json的方式定义工作流程,看了源码后觉得扩展性还是挺好的,至少能满足我的需求,于是选择对它下手。
jsPlumb是一个开源的比较强大的绘图组件,这里不多介绍,我就是用它实现一个简单的流程设计器。
花了差不多一个月的时间,把这三者结合到一起实现一个简单而强大的工作流模块。
目录ABP中AbpWorkflow和AbpStepBody的自定义注册
设计器实现
设计器提交的流程数据转换成WorkflowCore支持的Json数据结构
总结
1.ABP模块实现WorkflowCore持久化存储接口(IPersistenceProvider)
这里我参考了WorkflowCore.Persistence.EntityFramework 持久化项目的实现方式 用ABP的方式实现了WorkflowCore的持久化。这样做有两个好处:
1.让工作流能支持ABP的多租户和全局数据过滤功能
2.数据库操作能使用统一的数据上下文,方便事务提交和回滚。
ABP实现的流程Workflow持久化存储所必须的实体类,其中PersistedWorkflowDefinition是用来持久化存储流程定义(在Workflow中流程定义在内存中)如下图:
实现IPersistenceProvider接口
1 public interface IAbpPersistenceProvider : IPersistenceProvider 2 { 3 Task<PersistedWorkflow> GetPersistedWorkflow(Guid id); 4 5 Task<PersistedExecutionPointer> GetPersistedExecutionPointer(string id); 6 Task<PersistedWorkflowDefinition> GetPersistedWorkflowDefinition(string id, int version); 7 } 8 9 10 public class AbpPersistenceProvider : DomainService, IAbpPersistenceProvider 11 { 12 protected readonly IRepository<PersistedEvent, Guid> _eventRepository; 13 protected readonly IRepository<PersistedExecutionPointer, string> _executionPointerRepository; 14 protected readonly IRepository<PersistedWorkflow, Guid> _workflowRepository; 15 protected readonly IRepository<PersistedWorkflowDefinition, string > _workflowDefinitionRepository; 16 protected readonly IRepository<PersistedSubscription, Guid> _eventSubscriptionRepository; 17 protected readonly IRepository<PersistedExecutionError, Guid> _executionErrorRepository; 18 protected readonly IGuidGenerator _guidGenerator; 19 protected readonly IAsyncQueryableExecuter _asyncQueryableExecuter; 20 public IAbpSession AbpSession { get; set; } 21 22 23 public AbpPersistenceProvider(IRepository<PersistedEvent, Guid> eventRepository, IRepository<PersistedExecutionPointer, string> executionPointerRepository, IRepository<PersistedWorkflow, Guid> workflowRepository, IRepository<PersistedSubscription, Guid> eventSubscriptionRepository, IGuidGenerator guidGenerator, IAsyncQueryableExecuter asyncQueryableExecuter, IRepository<PersistedExecutionError, Guid> executionErrorRepository, IRepository<PersistedWorkflowDefinition, string > workflowDefinitionRepository) 24 { 25 26 _eventRepository = eventRepository; 27 _executionPointerRepository = executionPointerRepository; 28 _workflowRepository = workflowRepository; 29 _eventSubscriptionRepository = eventSubscriptionRepository; 30 _guidGenerator = guidGenerator; 31 _asyncQueryableExecuter = asyncQueryableExecuter; 32 _executionErrorRepository = executionErrorRepository; 33 _workflowDefinitionRepository = workflowDefinitionRepository; 34 35 36 } 37 [UnitOfWork] 38 public virtual async Task<string> CreateEventSubscription(EventSubscription subscription) 39 { 40 41 subscription.Id = _guidGenerator.Create().ToString(); 42 var persistable = subscription.ToPersistable(); 43 await _eventSubscriptionRepository.InsertAsync(persistable); 44 return subscription.Id; 45 } 46 [UnitOfWork] 47 public virtual async Task<string> CreateNewWorkflow(WorkflowInstance workflow) 48 { 49 workflow.Id = _guidGenerator.Create().ToString(); 50 var persistable = workflow.ToPersistable(); 51 if (AbpSession.UserId.HasValue) 52 { 53 var userCache = AbpSession.GetCurrentUser(); 54 persistable.CreateUserIdentityName = userCache.FullName; 55 } 56 await _workflowRepository.InsertAsync(persistable); 57 return workflow.Id; 58 } 59 [UnitOfWork] 60 public virtual async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt) 61 { 62 var now = asAt.ToUniversalTime().Ticks; 63 64 var query = _workflowRepository.GetAll().Where(x => x.NextExecution.HasValue && (x.NextExecution <= now) && (x.Status == WorkflowStatus.Runnable)) 65 .Select(x => x.Id); 66 var raw = await _asyncQueryableExecuter.ToListAsync(query); 67 68 return raw.Select(s => s.ToString()).ToList(); 69 } 70 [UnitOfWork] 71 public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) 72 { 73 74 IQueryable<PersistedWorkflow> query = _workflowRepository.GetAll() 75 .Include(wf => wf.ExecutionPointers) 76 .ThenInclude(ep => ep.ExtensionAttributes) 77 .Include(wf => wf.ExecutionPointers) 78 .AsQueryable(); 79 80 if (status.HasValue) 81 query = query.Where(x => x.Status == status.Value); 82 83 if (!String.IsNullOrEmpty(type)) 84 query = query.Where(x => x.WorkflowDefinitionId == type); 85 86 if (createdFrom.HasValue) 87 query = query.Where(x => x.CreateTime >= createdFrom.Value); 88 89 if (createdTo.HasValue) 90 query = query.Where(x => x.CreateTime <= createdTo.Value); 91 92 var rawResult = await query.Skip(skip).Take(take).ToListAsync(); 93 List<WorkflowInstance> result = new List<WorkflowInstance>(); 94 95 foreach (var item in rawResult) 96 result.Add(item.ToWorkflowInstance()); 97 98 return result; 99 100 } 101 [UnitOfWork] 102 public virtual async Task<WorkflowInstance> GetWorkflowInstance(string Id) 103 { 104 105 var uid = new Guid(Id); 106 var raw = await _workflowRepository.GetAll() 107 .Include(wf => wf.ExecutionPointers) 108 .ThenInclude(ep => ep.ExtensionAttributes) 109 .Include(wf => wf.ExecutionPointers) 110 .FirstAsync(x => x.Id == uid); 111 112 if (raw == null) 113 return null; 114 115 return raw.ToWorkflowInstance(); 116 117 } 118 [UnitOfWork] 119 public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids) 120 { 121 if (ids == null) 122 { 123 return new List<WorkflowInstance>(); 124 } 125 126 127 var uids = ids.Select(i => new Guid(i)); 128 var raw = _workflowRepository.GetAll() 129 .Include(wf => wf.ExecutionPointers) 130 .ThenInclude(ep => ep.ExtensionAttributes) 131 .Include(wf => wf.ExecutionPointers) 132 .Where(x => uids.Contains(x.Id)); 133 134 return (await raw.ToListAsync()).Select(i => i.ToWorkflowInstance()); 135 136 } 137 [UnitOfWork] 138 public virtual async Task PersistWorkflow(WorkflowInstance workflow) 139 { 140 141 var uid = new Guid(workflow.Id); 142 var existingEntity = await _workflowRepository.GetAll() 143 .Where(x => x.Id == uid) 144 .Include(wf => wf.ExecutionPointers) 145 .ThenInclude(ep => ep.ExtensionAttributes) 146 .Include(wf => wf.ExecutionPointers) 147 .AsTracking() 148 .FirstAsync(); 149 var persistable = workflow.ToPersistable(existingEntity); 150 await CurrentUnitOfWork.SaveChangesAsync(); 151 } 152 [UnitOfWork] 153 public virtual async Task TerminateSubscription(string eventSubscriptionId) 154 { 155 156 var uid = new Guid(eventSubscriptionId); 157 var existing = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == uid); 158 _eventSubscriptionRepository.Delete(existing); 159 await CurrentUnitOfWork.SaveChangesAsync(); 160 161 } 162 [UnitOfWork] 163 public virtual void EnsureStoreExists() 164 { 165 166 167 } 168 [UnitOfWork] 169 public virtual async Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf) 170 { 171 172 asOf = asOf.ToUniversalTime(); 173 var raw = await _eventSubscriptionRepository.GetAll() 174 .Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf) 175 .ToListAsync(); 176 177 return raw.Select(item => item.ToEventSubscription()).ToList(); 178 179 } 180 [UnitOfWork] 181 public virtual async Task<string> CreateEvent(Event newEvent) 182 { 183 184 newEvent.Id = _guidGenerator.Create().ToString(); 185 var persistable = newEvent.ToPersistable(); 186 var result = _eventRepository.InsertAsync(persistable); 187 await CurrentUnitOfWork.SaveChangesAsync(); 188 return newEvent.Id; 189 } 190 [UnitOfWork] 191 public virtual async Task<Event> GetEvent(string id) 192 { 193 194 Guid uid = new Guid(id); 195 var raw = await _eventRepository 196 .FirstOrDefaultAsync(x => x.Id == uid); 197 198 if (raw == null) 199 return null; 200 201 return raw.ToEvent(); 202 203 } 204 [UnitOfWork] 205 public virtual async Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt) 206 { 207 var now = asAt.ToUniversalTime(); 208 209 asAt = asAt.ToUniversalTime(); 210 var raw = await _eventRepository.GetAll() 211 .Where(x => !x.IsProcessed) 212 .Where(x => x.EventTime <= now) 213 .Select(x => x.Id) 214 .ToListAsync(); 215 216 return raw.Select(s => s.ToString()).ToList(); 217 218 } 219 [UnitOfWork] 220 public virtual async Task MarkEventProcessed(string id) 221 { 222 223 var uid = new Guid(id); 224 var existingEntity = await _eventRepository.GetAll() 225 .Where(x => x.Id == uid) 226 .AsTracking() 227 .FirstAsync(); 228 229 existingEntity.IsProcessed = true; 230 await CurrentUnitOfWork.SaveChangesAsync(); 231 } 232 [UnitOfWork] 233 public virtual async Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf) 234 { 235 236 var raw = await _eventRepository.GetAll() 237 .Where(x => x.EventName == eventName && x.EventKey == eventKey) 238 .Where(x => x.EventTime >= asOf) 239 .Select(x => x.Id) 240 .ToListAsync(); 241 242 var result = new List<string>(); 243 244 foreach (var s in raw) 245 result.Add(s.ToString()); 246 247 return result; 248 249 } 250 [UnitOfWork] 251 public virtual async Task MarkEventUnprocessed(string id) 252 { 253 254 var uid = new Guid(id); 255 var existingEntity = await _eventRepository.GetAll() 256 .Where(x => x.Id == uid) 257 .AsTracking() 258 .FirstAsync(); 259 260 existingEntity.IsProcessed = false; 261 await CurrentUnitOfWork.SaveChangesAsync(); 262 263 } 264 [UnitOfWork] 265 public virtual async Task PersistErrors(IEnumerable<ExecutionError> errors) 266 { 267 268 var executionErrors = errors as ExecutionError[] ?? errors.ToArray(); 269 if (executionErrors.Any()) 270 { 271 foreach (var error in executionErrors) 272 { 273 await _executionErrorRepository.InsertAsync(error.ToPersistable()); 274 } 275 await CurrentUnitOfWork.SaveChangesAsync(); 276 277 } 278 279 } 280 [UnitOfWork] 281 public virtual async Task<EventSubscription> GetSubscription(string eventSubscriptionId) 282 { 283 284 var uid = new Guid(eventSubscriptionId); 285 var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == uid); 286 287 return raw?.ToEventSubscription(); 288 289 } 290 [UnitOfWork] 291 public virtual async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf) 292 { 293 294 var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf && x.ExternalToken == null); 295 296 return raw?.ToEventSubscription(); 297 298 } 299 [UnitOfWork] 300 public virtual async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) 301 { 302 303 var uid = new Guid(eventSubscriptionId); 304 var existingEntity = await _eventSubscriptionRepository.GetAll() 305 .Where(x => x.Id == uid) 306 .AsTracking() 307 .FirstAsync(); 308 309 existingEntity.ExternalToken = token; 310 existingEntity.ExternalWorkerId = workerId; 311 existingEntity.ExternalTokenExpiry = expiry; 312 await CurrentUnitOfWork.SaveChangesAsync(); 313 314 return true; 315 316 } 317 [UnitOfWork] 318 public virtual async Task ClearSubscriptionToken(string eventSubscriptionId, string token) 319 { 320 321 var uid = new Guid(eventSubscriptionId); 322 var existingEntity = await _eventSubscriptionRepository.GetAll() 323 .Where(x => x.Id == uid) 324 .AsTracking() 325 .FirstAsync(); 326 327 if (existingEntity.ExternalToken != token) 328 throw new InvalidOperationException(); 329 330 existingEntity.ExternalToken = null; 331 existingEntity.ExternalWorkerId = null; 332 existingEntity.ExternalTokenExpiry = null; 333 await CurrentUnitOfWork.SaveChangesAsync(); 334 335 } 336 337 public Task<PersistedWorkflow> GetPersistedWorkflow(Guid id) 338 { 339 return _workflowRepository.GetAsync(id); 340 } 341 342 public Task<PersistedWorkflowDefinition> GetPersistedWorkflowDefinition(string id, int version) 343 { 344 return _workflowDefinitionRepository.GetAll().AsNoTracking().FirstOrDefaultAsync(u => u.Id == id && u.Version == version); 345 } 346 347 public Task<PersistedExecutionPointer> GetPersistedExecutionPointer(string id) 348 { 349 return _executionPointerRepository.GetAsync(id); 350 } 351 }