InterSystems IRIS supports publish and subscribe message delivery. Publish and subscribe refers to the technique of routing a message to one or more subscribers based on the fact that those subscribers have previously registered to be notified about messages on a specific topic.
This article demonstrates how several InterSystems IRIS capabilities can work together:
- Publish/Subscribe
- Workflow
- Alerts
- PPG (that's general capability though)
In this article we would send emails about:
- New workflow tasks
- Unassigned workflow tasks
- Uncompleted workflow tasks
- Ensemble alerts
Email recipients would be determined using Publish/Subscribe operation and each user would receive only digest email whenever possible.
Publish/Subscribe
InterSystems IRIS supports publish and subscribe message delivery. Publish and subscribe refers to the technique of routing a message to one or more subscribers based on the fact that those subscribers have previously registered to be notified about messages on a specific topic.
Before everything else two Publish/Subscribe domains (one for Workflow tasks and one for Ensemble Alerts) should be defined with the structure of topics, you need for your project. The process is described in the documentation.
Workflow
To test workflow you'll need workflow tasks. Demo.Workflow production could be used to generate test tasks. Article on Community about workflow.
Email Service
We start with a Service. It does most of the work:
- Queries workflow data to get New, Unassigned and Uncompleted tasks
- Calls Subscription operation to get recipients
- Writes task info and recipients into a temporary table Test.PPGEmail
- Forms digest email for each recipient
- Sends emails to email operation
It has several configurable parameters:
- ApprovalDomain - Subscriptions domain for approval tasks
- SendBO - Operation that would actually send new messages to subscribers (for example emails)
- SubscriptionBO - Subscription operation
- UnacceptedTime - Time (in hours) while task can be unaccepted without notification
- UncompletedTime - Time (in hours) a task can be uncompleted without notification
- URL - Base workflow portal url. Could be a dashboard url or a link to a custom portal, for example, Ensemble Workflow UI
Query workflow data
InterSystems IRIS workflow stores data in EnsLib_Workflow.TaskResponse table. Let's query it:
Query newWorkflowTasks(time) As %SQLQuery { SELECT TaskStatus_RoleName As topic, "%Subject" As text, "%Message" As message FROM EnsLib_Workflow.TaskResponse WHERE "%Status" = 'Unassigned' AND TaskStatus_TimeCreated >= :time ORDER BY TaskStatus_RoleName } Query unassignedWorkflowTasks(hours) As %SQLQuery { SELECT TaskStatus_RoleName As topic, "%Subject" As text, "%Message" As message FROM EnsLib_Workflow.TaskResponse WHERE "%Status" = 'Unassigned' AND DATEDIFF('hh', TaskStatus_TimeCreated, NOW()) >= :hours ORDER BY TaskStatus_RoleName } Query uncompletedWorkflowTasks(time) As %SQLQuery { SELECT TaskStatus_RoleName As topic, "%Subject" As text, "%Message" As message FROM EnsLib_Workflow.TaskResponse WHERE "%Status" = 'Assigned' AND DATEDIFF('hh', TaskStatus_TimeCreated, NOW()) >= :hours ORDER BY TaskStatus_RoleName } /// Populates Test.PPGEmail with "messages" to send Method generateWorkflowDataByType(type As %String(VALUELIST="new,unassigned,uncompleted")) As %Status { #dim sc As %Status = $$$OK set prevTopic = "" if type = "new" { set query = "newWorkflowTasks" set arg = ..getTime() } elseif type = "unassigned" { set query = "unassignedWorkflowTasks" set arg = ..UnacceptedTime } elseif type = "uncompleted" { set query = "uncompletedWorkflowTasks" set arg = ..UncompletedTime } #dim rs = $classmethod(,query _ "Func", arg) while rs.%Next() { set topic = rs.topic _ "." _ type if prevTopic'=topic { set emails = ..determineEmails(..ApprovalDomain, topic) set prevTopic = topic } set message = rs.message set:message'="" message = " - "_ message set sc = ##class(Test.PPGEmail).add(..ApprovalDomain, topic, emails, rs.text _ message) quit:$$$ISERR(sc) } quit sc } /// Get email addresses by domain and topic. Method determineEmails(domain As %String, topic As %String) As %List { set subRequest = ##class(EnsLib.PubSub.Request).%New() set subRequest.Topic = topic set subRequest.DomainName = domain do ..SendRequestSync(..SubscriptionBO, subRequest, .subResponse,, "Get subscribers for domain: " _ domain _ ", topic: " _ topic) set mails = "" for i=1:1:subResponse.TargetList.Count() { #dim target As EnsLib.PubSub.Target set target = subResponse.TargetList.GetAt(i) set mails = mails _ $lb(target.Address) } return mails }
In determineEmails method, Subscription operation is called to get a list of subscribers.
Test.PPGEmail
Query data populates temporary Test.PPGEmail table:
/// Class to store subscriptions in PPG Class Test.PPGEmail Extends %Persistent { /// Domain Property domain As %String(MAXLEN = 100); /// Topic Property topic As %String(MAXLEN = 1000) [ Required ]; /// Where to send current subscription. STORAGEDEFAULT and SQLPROJECTION allow SQL access Property emails As list Of %String(SQLPROJECTION = "table/column", STORAGEDEFAULT = "array"); // Not sure if it improves performance // Index emailsIndex On emails(ELEMENTS); /// Text Property text As %String(MAXLEN = ""); /// Add subscription /// w ##class(Test.PPGEmail).add("APPROVAL", "Contract approval", $lb("1@1.com","2@2.com"), "text") ClassMethod add(domain As %String, topic As %String, emails As %List, text As %String) As %Status { set obj = ..%New() set obj.domain = domain set obj.topic = topic set obj.text = text if $listvalid(emails) { for i=1:1:$ll(emails){ do obj.emails.Insert($lg(emails,i)) } } return obj.%Save() } Query flushTable() As %Query { SELECT * FROM Test.PPGEmail } /// !!! PPG storage Storage Default { <Data name="PPGEmailDefaultData"> <Value name="1"> <Value>%%CLASSNAME</Value> </Value> <Value name="2"> <Value>domain</Value> </Value> <Value name="3"> <Value>topic</Value> </Value> <Value name="4"> <Value>text</Value> </Value> </Data> <Data name="emails"> <Attribute>emails</Attribute> <Structure>subnode</Structure> <Subscript>"emails"</Subscript> </Data> <DataLocation>^||Test.PPGEmailD</DataLocation> <DefaultData>PPGEmailDefaultData</DefaultData> <Description> <![CDATA[!!! PPG storage]]> </Description> <IdLocation>^||Test.PPGEmailD</IdLocation> <IndexLocation>^||Test.PPGEmailI</IndexLocation> <StreamLocation>^||Test.PPGEmailS</StreamLocation> <Type>%Library.CacheStorage</Type> } }
There are two tricks here, first it's emails property:
Property emails As list Of %String(SQLPROJECTION = "table/column", STORAGEDEFAULT = "array");
SQLPROJECTION and STORAGEDEFAULT property parameters allow us to query it later by email (and especially GROUP BY email).
Second, we store data in process private globals so we can run several similar services without caring about concurrency.
Generating digest email
As each task has several recipients and each recipient has a lot of tasks we need to send digest emails.
/// Generate mails and send to BO Method sendEmails() { &sql(DECLARE C1 CURSOR FOR SELECT DISTINCT emails INTO :email FROM Test.PPGEmail_emails ) &sql(OPEN C1) &sql(FETCH C1) While (SQLCODE = 0) { set text = ..generateEmailText(email, ..url) set msg = ##class(Test.Alert).%New() set msg.topic = "Your tasks" set msg.to = email set msg.text = text if ..SendBO '="" { do ..SendRequestAsync(..SendBO, msg ,"Email with wf tasks for: " _ email) } &sql(FETCH C1) } &sql(CLOSE C1) }
With our storage schema for emails property getting distinct emails is very fast, next we iterate over recipients, build email for each and pass it to an email operation. Instead of email, it could be a Telegram Operation or anything else really.
To get a list of tasks by an email we can use FOR SOME %ELEMENT syntax:
SELECT "domain", topic, text FROM Test.PPGEmail WHERE FOR SOME %ELEMENT(emails) (%VALUE=:email) ORDER BY "domain", topic
Email Operation
Finally, everything is ready to send alerts. The operation receives either our Test.Alert message or InterSystems IRIS alert and sends email to a list of recipients. For InterSystems IRIS alerts, it also gets subscribers from Publish operation. Check the code for sources, but there's nothing interesting there to justify copy&pasting code into this article.
Conclusion
InterSystems IRIS can implement various Publish/Subscribe flows. Digest emails can be easily generated using various storage mechanisms, allowing for M:N relationships between data and recipients.
Hey
Its really hard to gather any flow or re-use.
Did you made any operation that uses Ens.PubSubRouting?
Both Service and Operation presented in the article use EnsLib.PubSub.PubSubOperation to get subscribers.
Here's getting a list of emails by domain and topic:
Ok. Let me import the full code and take a look.
In case you are on skype , add me pls: https://join.skype.com/invite/Nqr7BaOpnh2o
Neerav, pls see your pub/sub post. I left my comments there last week. Look forward having your code implementation to understand pub/sub processing.
Thanks