因为微软的Workflow Foundation里对WorkflowPersistenceService只提供了一个用于SqlServer的实现,对于想用别的方式保存工作流状态的应用就需要自己实现WorkflowPersistenceService了,今天自己写了个用于MySQL的实现,应该说是一个最简陋的实现了,只完成了最简单的保存和恢复的功能,高级点儿的加锁之类的功能还没弄明白。不过我想这些对学习WF的朋友应该也能帮上点儿忙。
数据库只有三个字段:
id varchar(36) Workflow Guid state blob Workflow State Serialized status tinyint Workflow Status
代码如下:
using System; using System.Collections.Generic; using System.Data; using System.Data.Common; using System.Workflow.ComponentModel; using System.Workflow.Runtime; using System.Workflow.Runtime.Hosting; using Microsoft.Practices.EnterpriseLibrary.Data; namespace Ganji.OA.Workflow { public class MysqlWorkflowPersistenceService : WorkflowPersistenceService { public MysqlWorkflowPersistenceService() : base() {} public List GetAllWorkflows() { List result = new List(); Database db = DatabaseFactory.CreateDatabase("OAData"); DbCommand command = db.GetSqlStringCommand("SELECT id FROM workflow_state"); DbDataReader reader = command.ExecuteReader(); while (reader.Read()) { result.Add(new Guid((string)reader["id"])); } reader.Close(); return result; } protected override Activity LoadCompletedContextActivity(Guid scopeId, Activity outerActivity) { Guid instanceId = WorkflowEnvironment.WorkflowInstanceId; Activity activity = Deserialize(instanceId, scopeId, outerActivity); return activity; } protected override Activity LoadWorkflowInstanceState(Guid instanceId) { Activity activity = Deserialize(instanceId, Guid.Empty, null); return activity; } protected override void SaveCompletedContextActivity(Activity activity) { Guid instanceId = WorkflowEnvironment.WorkflowInstanceId; Guid contextId = (Guid)activity.GetValue(Activity.ActivityContextGuidProperty); Serialize(instanceId, contextId, activity); } protected override void SaveWorkflowInstanceState(Activity rootActivity, bool unlock) { WorkflowStatus status = GetWorkflowStatus(rootActivity); Guid instanceId = WorkflowEnvironment.WorkflowInstanceId; if (status == WorkflowStatus.Terminated || status == WorkflowStatus.Completed) { DeleteWorkflow(instanceId); } else { Serialize(instanceId, Guid.Empty, rootActivity); } } protected override bool UnloadOnIdle(Activity activity) { return true; } protected override void UnlockWorkflowInstanceState(Activity rootActivity) { } private void Serialize(Guid instanceId, Guid contextId, Activity activity) { byte[] state = WorkflowPersistenceService.GetDefaultSerializedForm(activity); Database db = DatabaseFactory.CreateDatabase("OAData"); String sql_del = "DELETE FROM workflow_state WHERE id=@id"; DbCommand command_del = db.GetSqlStringCommand(sql_del); db.AddInParameter(command_del, "@id", DbType.String, instanceId.ToString()); String sql = "INSERT INTO workflow_state (`id`, `state`, `status`) VALUES (@id, @state, @status)"; DbCommand command = db.GetSqlStringCommand(sql); db.AddInParameter(command, "@id", DbType.String, instanceId.ToString()); db.AddInParameter(command, "@state", DbType.Binary, state); db.AddInParameter(command, "@status", DbType.Byte, 0); using (DbConnection conn = db.CreateConnection()) { conn.Open(); db.ExecuteNonQuery(command_del); db.ExecuteNonQuery(command); conn.Close(); } } private Activity Deserialize(Guid instanceId, Guid contextId, Activity outerActivity) { Database db = DatabaseFactory.CreateDatabase("OAData"); string sql = "SELECT * FROM workflow_state WHERE id='" + instanceId.ToString() + "'"; Activity activity = null; using (DbDataReader reader = db.ExecuteReader(CommandType.Text, sql) as DbDataReader) { if (reader.HasRows) { reader.Read(); byte[] state = (byte[])reader["state"]; reader.Close(); activity = WorkflowPersistenceService.RestoreFromDefaultSerializedForm(state, outerActivity); } } return activity; } private void DeleteWorkflow(Guid instanceId) { Database db = DatabaseFactory.CreateDatabase("OAData"); string sql = "DELETE FROM workflow_state WHERE id=@id"; DbCommand command = db.GetSqlStringCommand(sql); db.AddInParameter(command, "@id", DbType.String, instanceId.ToString()); using (DbConnection conn = db.CreateConnection()) { conn.Open(); command.ExecuteNonQuery(); conn.Close(); } } } }
重要更正: 在Pro WF书里说保存和恢复Activity的状态是用的Activity对象的Save和Load方法,这样保存的对象信息有六十多KB,还只是对于一个只有5个状态的小工作流,这个尺寸放在MySQL的blob里都放不下,造成了运行时一开始可以正常的存入工作流状态,但状态变更后,工作流序列化后的对象增大,保存进数据库的时候被截断了,造成了下次从数据库取得状态时报一个“在分析完成之前就遇到流结尾。”的错误,浪费了近一天的时间去解决这个问题。最终在MSDN的一篇文章的一小段例子里,看到它恢复Activity状态用的是WorkflowPersistenceService对象的这个方法
protected static Activity RestoreFromDefaultSerializedForm( byte[] activityBytes, Activity outerActivity )
相应的保存Activity状态的方法是: protected static byte[] GetDefaultSerializedForm( Activity activity ) 这一对方法对序列化的状态用GZIP进行了压缩,上面六十多KB的数据现在只有13K左右了。
protected static byte[] GetDefaultSerializedForm( Activity activity )
这一对方法对序列化的状态用GZIP进行了压缩,上面六十多KB的数据现在只有13K左右了。
Your email is never published nor shared. Required fields are marked *
You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>
用于MySQL的WF WorkflowPersistenceService
因为微软的Workflow Foundation里对WorkflowPersistenceService只提供了一个用于SqlServer的实现,对于想用别的方式保存工作流状态的应用就需要自己实现WorkflowPersistenceService了,今天自己写了个用于MySQL的实现,应该说是一个最简陋的实现了,只完成了最简单的保存和恢复的功能,高级点儿的加锁之类的功能还没弄明白。不过我想这些对学习WF的朋友应该也能帮上点儿忙。
数据库只有三个字段:
代码如下:
using System; using System.Collections.Generic; using System.Data; using System.Data.Common; using System.Workflow.ComponentModel; using System.Workflow.Runtime; using System.Workflow.Runtime.Hosting; using Microsoft.Practices.EnterpriseLibrary.Data; namespace Ganji.OA.Workflow { public class MysqlWorkflowPersistenceService : WorkflowPersistenceService { public MysqlWorkflowPersistenceService() : base() {} public List GetAllWorkflows() { List result = new List(); Database db = DatabaseFactory.CreateDatabase("OAData"); DbCommand command = db.GetSqlStringCommand("SELECT id FROM workflow_state"); DbDataReader reader = command.ExecuteReader(); while (reader.Read()) { result.Add(new Guid((string)reader["id"])); } reader.Close(); return result; } protected override Activity LoadCompletedContextActivity(Guid scopeId, Activity outerActivity) { Guid instanceId = WorkflowEnvironment.WorkflowInstanceId; Activity activity = Deserialize(instanceId, scopeId, outerActivity); return activity; } protected override Activity LoadWorkflowInstanceState(Guid instanceId) { Activity activity = Deserialize(instanceId, Guid.Empty, null); return activity; } protected override void SaveCompletedContextActivity(Activity activity) { Guid instanceId = WorkflowEnvironment.WorkflowInstanceId; Guid contextId = (Guid)activity.GetValue(Activity.ActivityContextGuidProperty); Serialize(instanceId, contextId, activity); } protected override void SaveWorkflowInstanceState(Activity rootActivity, bool unlock) { WorkflowStatus status = GetWorkflowStatus(rootActivity); Guid instanceId = WorkflowEnvironment.WorkflowInstanceId; if (status == WorkflowStatus.Terminated || status == WorkflowStatus.Completed) { DeleteWorkflow(instanceId); } else { Serialize(instanceId, Guid.Empty, rootActivity); } } protected override bool UnloadOnIdle(Activity activity) { return true; } protected override void UnlockWorkflowInstanceState(Activity rootActivity) { } private void Serialize(Guid instanceId, Guid contextId, Activity activity) { byte[] state = WorkflowPersistenceService.GetDefaultSerializedForm(activity); Database db = DatabaseFactory.CreateDatabase("OAData"); String sql_del = "DELETE FROM workflow_state WHERE id=@id"; DbCommand command_del = db.GetSqlStringCommand(sql_del); db.AddInParameter(command_del, "@id", DbType.String, instanceId.ToString()); String sql = "INSERT INTO workflow_state (`id`, `state`, `status`) VALUES (@id, @state, @status)"; DbCommand command = db.GetSqlStringCommand(sql); db.AddInParameter(command, "@id", DbType.String, instanceId.ToString()); db.AddInParameter(command, "@state", DbType.Binary, state); db.AddInParameter(command, "@status", DbType.Byte, 0); using (DbConnection conn = db.CreateConnection()) { conn.Open(); db.ExecuteNonQuery(command_del); db.ExecuteNonQuery(command); conn.Close(); } } private Activity Deserialize(Guid instanceId, Guid contextId, Activity outerActivity) { Database db = DatabaseFactory.CreateDatabase("OAData"); string sql = "SELECT * FROM workflow_state WHERE id='" + instanceId.ToString() + "'"; Activity activity = null; using (DbDataReader reader = db.ExecuteReader(CommandType.Text, sql) as DbDataReader) { if (reader.HasRows) { reader.Read(); byte[] state = (byte[])reader["state"]; reader.Close(); activity = WorkflowPersistenceService.RestoreFromDefaultSerializedForm(state, outerActivity); } } return activity; } private void DeleteWorkflow(Guid instanceId) { Database db = DatabaseFactory.CreateDatabase("OAData"); string sql = "DELETE FROM workflow_state WHERE id=@id"; DbCommand command = db.GetSqlStringCommand(sql); db.AddInParameter(command, "@id", DbType.String, instanceId.ToString()); using (DbConnection conn = db.CreateConnection()) { conn.Open(); command.ExecuteNonQuery(); conn.Close(); } } } }重要更正: 在Pro WF书里说保存和恢复Activity的状态是用的Activity对象的Save和Load方法,这样保存的对象信息有六十多KB,还只是对于一个只有5个状态的小工作流,这个尺寸放在MySQL的blob里都放不下,造成了运行时一开始可以正常的存入工作流状态,但状态变更后,工作流序列化后的对象增大,保存进数据库的时候被截断了,造成了下次从数据库取得状态时报一个“在分析完成之前就遇到流结尾。”的错误,浪费了近一天的时间去解决这个问题。最终在MSDN的一篇文章的一小段例子里,看到它恢复Activity状态用的是WorkflowPersistenceService对象的这个方法