Integrazione MQ e DB2
Con
la versione IBM i 7.1 il DB2 permette strette interazioni con MQ (Websphere MQ
Series), rendendo, di fatto, molto semplici le procedure d’integrazione tra i
due prodotti.
Websphere MQ
Per
chi non conosce MQ qualche coordinata per collocarlo nel panorama IT:
IBM
Websphere MQ è un software (middleware) che permette la comunicazione certa via
rete tra piattaforme anche eterogenee e applicazioni potenzialmente non
concorrenti.
Le
keyword in questa semplice definizione sono
·
Comunicazione: è un prodotto che permette di
inviare e ricevere informazioni anche non strutturate, denominate Messaggi.
·
Certa: c’è la certezza che di un messaggio
esista una sola istanza, o nel sistema di trasmissione o in quello di
ricezione: non esiste il caso dubbio di messaggi duplicati.
·
Piattaforme eterogenee: innumerevoli piattaforme
server e altrettante client che possono condividere l’architettura, i comandi e
le configurazioni del prodotto, gestendo in maniera trasparente i diversi
encoding (es conversioni ASCII – EBCDIC)
·
Applicazioni non concorrenti: l’invio di un
messaggio è garantito: è il prodotto che si occupa dello strato di
comunicazione, dei retry in caso di fail o di indisponibilità di uno dei
sistemi partner, garantendo anche il disaccoppiamento tra sistemi.
In
Websphere MQ una destinazione è detta MessageQueue e ogni MessageQueue è
catalogata in un QueueManager. Le attività basilari sono quindi l’inserimento
di un messaggio in una coda e la lettura di un messaggio da una coda tramite
un’interfaccia applicativa.

DB2 e MQ
Le funzioni disponibili dalla 7.1 permettono di
Leggere da una coda senza <consumare> il messaggio
(Browsing)
Leggere da una coda
Inviare messaggi
Le tre possibili funzioni sono implementate sia come
funzioni scalari - UDF (interazione con un solo messaggio per volta) sia come table-functions
UDTF (interazione con set di messaggi)
|
UDF – Scalar Functions
|
|
|
MQREAD
|
Legge un messaggio in una
colonna VarChar senza consumarlo
|
|
MQREADCLOB
|
Legge un messaggio in una colonna CLob senza consumarlo
|
|
MQRECEIVE
|
Legge un messaggio in una
colonna VarChar e lo rimuove dalla coda
|
|
MQRECEIVECLOB
|
Legge un messaggio in una colonna CLob e lo rimuove
dalla coda
|
|
MQSEND
|
Inserisce un messaggio in coda
|
|
UDTF – Table Functions
|
|
|
MQREADALL
|
Legge un set di messaggi in
una colonna VarChar senza consumarli
|
|
MQREADALLCLOB
|
Legge un set di messaggi in una colonna Clob senza
consumarli
|
|
MQRECEIVEALL
|
Legge un set di messaggi in
una colonna VarChar rimuovendoli dalla coda
|
|
MQRECEIVEALLCLOB
|
Legge un set di messaggi in una colonna Clob
rimuovendoli dalla coda
|
E’ interessante l’implementazione delle funzioni READ e
RECEIVE: operano in maniera simile
leggendo messaggi da una coda, differenziandosi nella modalità di browse (esame
senza “consumo”) e get pura (consumando quindi il messaggio)
Poiché i parametri per scrivere e leggere su una coda
possono essere veramente i più vari è stato scelta una modalità di
parametrizzazione che semplifica molto lo statement SQL, delegando l’insieme
dei parametri e delle configurazioni possibili a due tabelle gestite
dall’utente.
|
Definizioni
|
Nome Tabella
|
Default
|
Contenuti
|
|
MQ services
|
SYSIBM.MQSERVICE
|
DB2.DEFAULT.SERVICE
|
Definisce QManager e Coda con
la quale si ha interazione
|
|
MQ policy
|
SYSIBM.MQPOLICY
|
DB2.DEFAULT.POLICY
|
Definisce come i messaggi vengono trattati, in lettura
e scrittura
|
I servizi creano un riferimento incrociato tra un nome (da utilizzare nella function) e
la reale coda e qmanager; in questo modo l’amministratore del sistema maschera
la complessità della configurazione fornendo un nome simbolico (SERVICENAME)
|
Colonna
|
Descrizione
|
|
SERVICENAME
|
PrimaryKey della tabella,
contiene il nome simbolico del servizio
|
|
QUEUEMANAGER
|
Nome del Queue Manager, opzionale: se non specificato
verrà utilizzato il QManager di default (se configurato)
|
|
INPUTQUEUE
|
nome della coda
|
|
CODEDCHARSET
|
CCSID dei dati del messaggio da inviare/ricevere; il
valore speciale -3 istruisce la funzione ad utilizzare il JOBCCSID
|
|
ENCODING
|
Definisce il tipo di encoding
MQ, lasciare a 0
|
|
DESCRIPTION
|
Descrizione utente
|
Le
policy definiscono, per esempio, se il sistema dovrà essere transazionale, i
millisecondi di retry in caso di errore, la possibilità di accettare o meno
messaggi eccedenti la dimensione della variabile destinata ad accoglierli;
normalmente sono sufficienti pochi record di configurazione per coprire le
proprie esigenze.
E’
compito di chi configura il sistema inserire righe nelle due tabelle per poter
fruire dei servizi.
In questa tabelle elenco le colonne più importanti
|
Colonna
|
Descrizione
|
|
POLICYNAME
|
PrimaryKey della tabella,
contiene il nome simbolico del servizio
|
|
SEND_PRIORITY
|
Priorità del messaggio, default -1
|
|
SEND_RETRY_COUNT
|
numero dei tentativi in caso di fail della
procedura (default 5)
|
|
SEND_RETRY_INTERVAL
|
Intervallo in ms tra un tentativo e l’altro (default
1000)
|
|
SEND_MSG_TYPE
|
Determina il tipo
(formattazione) di messaggio, normalmente DTG = Datagramma
|
|
RCV_WAIT_INTERVAL
|
Specifica il timeout per la ricezione di messaggi il
default è 10 ms
|
|
RCV_CONVERT
|
Specifica se deve essere MQ ad
occuparsi della conversione tra differenti CCSID, default = ‘Y’
|
|
RCV_ACCEPT_TRUNC_MSG
|
Permette o meno la ricezione di messaggi con
troncamento, default = ‘Y’
|
|
SYNCPOINT
|
Se specificato Y il sistema di
accodamento/scodamento funziona in maniera transazionale, coinvolgendo quindi
COMMIT e ROLLBACK
|
Per
poter fruire dei comandi è quindi necessario creare un servizio per ogni coppia
qmanager/coda con la quale interagire; a meno di particolari configurazioni è
sufficiente duplicare la rige di default già presente sulla tabelle servizi,
creando una nuova riga con le corrette informazioni, per esempio, per creare un
servizio che punti alla coda MY_QUEUE nel Qmanager MY_QM posso clonare la riga
di default:
INSERT INTO SYSIBM.MQSERVICE
SELECT 'MY_QUEUE_SRV', 'MY_QM',
'MY_QUEUE', CODEDCHARSETID,
ENCODING, DESCRIPTION FROM
SYSIBM.MQSERVICE
WHERE
SERVICENAME ='DB2.DEFAULT.SERVICE';
Lo stesso per quanto riguarda le policy, giusto per avere
una policy personalizzata:
insert into
SYSIBM.MQPOLICY
SELECT 'MYPOLICY', SEND_PRIORITY,
SEND_PERSISTENCE,
SEND_EXPIRY,
SEND_RETRY_COUNT, SEND_RETRY_INTERVAL,
SEND_NEW_CORRELID,
SEND_RESPONSE_MSGID,
SEND_RESPONSE_CORRELID, SEND_EXCEPTION_ACTION,
SEND_REPORT_EXCEPTION, SEND_REPORT_COA,
SEND_REPORT_COD,
SEND_REPORT_EXPIRY, SEND_REPORT_ACTION,
SEND_MSG_TYPE, REPLY_TO_Q,
REPLY_TO_QMGR, RCV_WAIT_INTERVAL,
RCV_CONVERT,
RCV_ACCEPT_TRUNC_MSG, RCV_OPEN_SHARED,
SYNCPOINT, DESCRIPTION FROM
SYSIBM.MQPOLICY;
A questo punto possiamo tentare la lettura di un
messaggio da una coda:
SELECT MQREAD(‘MY_QUEUE_SRV’) from SYSIBM.SYSDUMMY1;
Se
non sono presenti errori di configurazione verrà visualizzato il primo
messaggio (FIFO) presente nella coda puntata dal servizio MY_QUEUE_SRV; in caso
di assenza di messaggi verrà restituito NULL.
Se
si desidera visualizzare l’intero resultset è sufficiente utilizzare la omologa
Table function:
SELECT
T1.* FROM TABLE(MQREADALL(‘MY_QUEUE_SRV’)) as T1;
Ulteriori
parametri permettono di applicare policy particolari, in particolare il secondo
parametro della funzione permette l’applicazione delle Policy:
SELECT
MQREAD(‘MY_QUEUE_SRV’, ‘MY_POLICY’) from SYSIBM.SYSDUMMY1;
SELECT
T1.* FROM TABLE(MQREADALL(‘MY_QUEUE_SRV’, ‘MY_POLICY’)) as T1;
Il
terzo parametro, opzionale, permette di specificare un particolare
Correlation_ID:
SELECT
MQREAD(‘MY_QUEUE_SRV’, ‘MY_POLICY’, ‘AAA’) from SYSIBM.SYSDUMMY1;
Differentemente
dalla funzione scalare la TableFunction può restituire più di un dominio di
valori, in particolare, oltre a restituire il messaggio restituisce altri
metadati:
CORRLEID Id Correlazione
TOPIC Topic
QNAME Nome dalla Coda
MSGID Id del messaggio
MSGFORMAT Tipo di messaggio
Si
può quindi scrivere statement del tipo
SELECT
T1.* FROM TABLE(MQREADALL(‘MY_QUEUE_SRV’, ‘MY_POLICY’)) as T1 where
T1.MSGFORMAT=’MQHMDE’;
E’
inoltre possibile specificare il numero di righe da restituire nel result-set
come terzo parametro:
SELECT
T1.* FROM TABLE(MQREADALL(‘MY_QUEUE_SRV’, ‘MY_POLICY’, 10)) as T1 where
T1.MSGFORMAT=’MQHMDE’;
Scrittura su Coda
La funzione MQSEND, che ha come possibili parametri
Servizio
Policy
Messaggio da inviare in coda
Può essere invocata tramite lo statement VALUES:
VALUES MQSEND(‘MY_QUEUE_SRV’, ‘mio messaggio da inserire
in coda’);
e, opzionalmente specificando una policy:
VALUES MQSEND((‘MY_QUEUE_SRV’, ‘MY_POLICY’, ‘mio
messaggio da inserire in coda’);
o direttamente da una SELECT:
SELECT MQSEND(‘MY_QUEUE_SRV’, COLUMN1) FROM MY_SCHEMA,
MY_TABLE where …
In questo caso viene immesso un messaggio per ogni riga
presente nella tabella MY_TABLE, col contenuto della colonna COLUMN1.
Il valore di ritorno della funzione varrà
1 se la funzione è stata eseguita con esito positivo, 0
negli altri casi.
Committment
Control
E’
utile ricordare che MQ partecipa, se richiesto, al sistema transazionale di
IBMi, in particolare è influenzata e assume idonei comportamenti in caso di job
sotto transazione (commitment control), gestendo correttamente i comandi di
i5OS <COMMIT> e <ROLLBACK>. Nel caso di consumo di messaggi con la
MQRECEIVE un’operazione COMMIT conferma l’avvenuta ricezione, sia su MQ che
eventualmente su DB, mentre una ROLLBACK provoca, come nelle transazioni DB, un
ritorno a situazione iniziale: i messaggi rimangono disponibili sulla coda.
Un caso pratico
MQ viene spesso utilizzato per inviare informazioni da condividere su
altri sistemi; la possibilità di utilizzare funzioni di manipolazione XML
assieme alle funzioni MQ rende molto interessante l’approccio alla condivisione
delle informazioni: XML permette infatti l’invio di metadati che descrivono il
messaggio stesso.
select
mqsend(‘MY_QUEUE_SRV’,
xmlserialize(
xmlelement(NAME
"Document",
xmlagg(XMLELEMENT(
NAME "Row",
xmlattributes
(trim(t.MY_PK) as "pk" ,
'PROPAGATE' as "op"),
xmlforest ( trim(t.DESCRIPTION) as "descr" ,
trim(t.STATUS) as "status"))
)) as varchar(32000))
)
from
MY_SCHEMA.MY_TABLE
T
where MY_PK = 123
questo semplice statement provvede a creare una colonna
di tipo XML, convertirla in varchar (xmlserialize) e inviarla alla coda
determinata dal servizio MY_QUEUE_SRV.
<Document>
<Row pk="123" op="PROPAGATE">
<description>Middle East &
Africa</description>
<status>1</status>
</Row>
</Document>
Limiti, memorandum
Il
DB2 e MQ devono risiedere sulla stessa istanza del sistema (non è quindi
possibile effettuare connessioni a QManager non presenti sul sistema dove è
installato il DB2)
Le
dimensioni massime di un messaggio gestite da questa implementazione sono 32K
per i Varchar e 2Mb per i CLOB; MQ non ha architetturalmente limiti di questo
tipo: è tranquillamente in grado di gestire messaggi di oltre 100Mb per singolo
messaggio ed è inoltre in grado di segmentare i messaggi stessi, rendendo
virtualmente illimitata la dimensione del flusso da trasferire.
Per
motivi prestazionali la sessione DB2 effettua il caching delle tabelle SYSIBM.MQSERVICE
e SYSIBM.MQPOLICY: in una stesso job possono non essere visibili modifiche
effettuate: occorre chiudere il lavoro e riconnettersi per recepire le
modifiche sulle tabelle di configurazione
In
caso di errori restituiti dalle API di MQ le funzioni DB2 restituiscono un
SQLCode negativo: errori più dettagliati sono sempre reperibili nel joblog es
SQL State:
38401
Vendor Code: -443
Message: [SQL0443] cc=2:rc=2058:MQCONN FAILED
Il
rc 2058 viene approfonditamente spiegato nel manuale “MQ Messages”.
Ciao
RispondiEliminainteressante articolo, sai se il servizio è utilizzabile come altre code per publisher/subscriber?
E'possibile usare la stessa coda per gestire topic diversi?
Grazie
usai le P/S molti anni fa; non vedo comunque problemi in quanto la struttura P/S si appoggia sì ai broker per la propagazione ma, alla fine, tutto viene inoltrato tramite code, che non hanno differenti attributi.
Elimina