lunedì 11 novembre 2013

Integrazione MQ e DB2


Integrazione MQ e DB2
Con la versione IBM i 7.1 il DB2 permette strette interazioni con MQ (Websphere MQ Series), rendendo, di fatto, molto semplici le procedure d’integrazione tra i due prodotti.



Websphere MQ
Per chi non conosce MQ qualche coordinata per collocarlo nel panorama IT:
IBM Websphere MQ è un software (middleware) che permette la comunicazione certa via rete tra piattaforme anche eterogenee e applicazioni potenzialmente non concorrenti.
Le keyword in questa semplice definizione sono
·       Comunicazione: è un prodotto che permette di inviare e ricevere informazioni anche non strutturate, denominate Messaggi.
·       Certa: c’è la certezza che di un messaggio esista una sola istanza, o nel sistema di trasmissione o in quello di ricezione: non esiste il caso dubbio di messaggi duplicati.
·       Piattaforme eterogenee: innumerevoli piattaforme server e altrettante client che possono condividere l’architettura, i comandi e le configurazioni del prodotto, gestendo in maniera trasparente i diversi encoding (es conversioni ASCII – EBCDIC)
·       Applicazioni non concorrenti: l’invio di un messaggio è garantito: è il prodotto che si occupa dello strato di comunicazione, dei retry in caso di fail o di indisponibilità di uno dei sistemi partner, garantendo anche il disaccoppiamento tra sistemi.
In Websphere MQ una destinazione è detta MessageQueue e ogni MessageQueue è catalogata in un QueueManager. Le attività basilari sono quindi l’inserimento di un messaggio in una coda e la lettura di un messaggio da una coda tramite un’interfaccia applicativa.





DB2 e MQ
Le funzioni disponibili dalla 7.1 permettono di
Leggere da una coda senza <consumare> il messaggio (Browsing)
Leggere da una coda
Inviare messaggi

Le tre possibili funzioni sono implementate sia come funzioni scalari - UDF (interazione con un solo messaggio per volta) sia come table-functions UDTF (interazione con set di messaggi)

UDF – Scalar Functions
MQREAD
Legge un messaggio in una colonna VarChar senza consumarlo
MQREADCLOB
Legge un messaggio in una colonna CLob senza consumarlo
MQRECEIVE
Legge un messaggio in una colonna VarChar e lo rimuove dalla coda
MQRECEIVECLOB
Legge un messaggio in una colonna CLob e lo rimuove dalla coda
MQSEND
Inserisce un messaggio in coda


UDTF – Table Functions
MQREADALL
Legge un set di messaggi in una colonna VarChar  senza consumarli
MQREADALLCLOB
Legge un set di messaggi in una colonna Clob senza consumarli
MQRECEIVEALL
Legge un set di messaggi in una colonna VarChar rimuovendoli dalla coda
MQRECEIVEALLCLOB
Legge un set di messaggi in una colonna Clob rimuovendoli dalla coda

E’ interessante l’implementazione delle funzioni READ e RECEIVE:  operano in maniera simile leggendo messaggi da una coda, differenziandosi nella modalità di browse (esame senza “consumo”) e get pura (consumando quindi il messaggio)

Poiché i parametri per scrivere e leggere su una coda possono essere veramente i più vari è stato scelta una modalità di parametrizzazione che semplifica molto lo statement SQL, delegando l’insieme dei parametri e delle configurazioni possibili a due tabelle gestite dall’utente.


Definizioni
Nome Tabella
Default
Contenuti
MQ services
SYSIBM.MQSERVICE
DB2.DEFAULT.SERVICE
Definisce QManager e Coda con la quale si ha interazione
MQ policy
SYSIBM.MQPOLICY
DB2.DEFAULT.POLICY
Definisce come i messaggi vengono trattati, in lettura e scrittura

I servizi creano un riferimento incrociato  tra un nome (da utilizzare nella function) e la reale coda e qmanager; in questo modo l’amministratore del sistema maschera la complessità della configurazione fornendo un nome simbolico (SERVICENAME)

Colonna
Descrizione
SERVICENAME
PrimaryKey della tabella, contiene il nome simbolico del servizio
QUEUEMANAGER
Nome del Queue Manager, opzionale: se non specificato verrà utilizzato il QManager di default (se configurato)
INPUTQUEUE
nome della coda
CODEDCHARSET
CCSID dei dati del messaggio da inviare/ricevere; il valore speciale -3 istruisce la funzione ad utilizzare il JOBCCSID
ENCODING
Definisce il tipo di encoding MQ, lasciare a 0
DESCRIPTION
Descrizione utente

Le policy definiscono, per esempio, se il sistema dovrà essere transazionale, i millisecondi di retry in caso di errore, la possibilità di accettare o meno messaggi eccedenti la dimensione della variabile destinata ad accoglierli; normalmente sono sufficienti pochi record di configurazione per coprire le proprie esigenze.
E’ compito di chi configura il sistema inserire righe nelle due tabelle per poter fruire dei servizi.

In questa tabelle elenco le colonne più importanti
Colonna
Descrizione
POLICYNAME
PrimaryKey della tabella, contiene il nome simbolico del servizio
SEND_PRIORITY
Priorità del messaggio, default -1
SEND_RETRY_COUNT
 numero dei tentativi in caso di fail della procedura (default 5)
SEND_RETRY_INTERVAL
Intervallo in ms tra un tentativo e l’altro (default 1000)
SEND_MSG_TYPE
Determina il tipo (formattazione) di messaggio, normalmente DTG = Datagramma
RCV_WAIT_INTERVAL
Specifica il timeout per la ricezione di messaggi il default è 10 ms
RCV_CONVERT
Specifica se deve essere MQ ad occuparsi della conversione tra differenti CCSID, default = ‘Y’
RCV_ACCEPT_TRUNC_MSG
Permette o meno la ricezione di messaggi con troncamento, default = ‘Y’
SYNCPOINT
Se specificato Y il sistema di accodamento/scodamento funziona in maniera transazionale, coinvolgendo quindi COMMIT e ROLLBACK

Per poter fruire dei comandi è quindi necessario creare un servizio per ogni coppia qmanager/coda con la quale interagire; a meno di particolari configurazioni è sufficiente duplicare la rige di default già presente sulla tabelle servizi, creando una nuova riga con le corrette informazioni, per esempio, per creare un servizio che punti alla coda MY_QUEUE nel Qmanager MY_QM posso clonare la riga di default:

INSERT INTO SYSIBM.MQSERVICE
SELECT 'MY_QUEUE_SRV', 'MY_QM', 'MY_QUEUE', CODEDCHARSETID,
ENCODING, DESCRIPTION FROM SYSIBM.MQSERVICE
WHERE SERVICENAME ='DB2.DEFAULT.SERVICE';

Lo stesso per quanto riguarda le policy, giusto per avere una policy personalizzata:
insert into  SYSIBM.MQPOLICY                                      
SELECT 'MYPOLICY', SEND_PRIORITY, SEND_PERSISTENCE, SEND_EXPIRY,                   
SEND_RETRY_COUNT, SEND_RETRY_INTERVAL, SEND_NEW_CORRELID,         
SEND_RESPONSE_MSGID, SEND_RESPONSE_CORRELID, SEND_EXCEPTION_ACTION,
SEND_REPORT_EXCEPTION, SEND_REPORT_COA, SEND_REPORT_COD,          
SEND_REPORT_EXPIRY, SEND_REPORT_ACTION, SEND_MSG_TYPE, REPLY_TO_Q,
REPLY_TO_QMGR, RCV_WAIT_INTERVAL, RCV_CONVERT,                    
RCV_ACCEPT_TRUNC_MSG, RCV_OPEN_SHARED, SYNCPOINT, DESCRIPTION FROM
SYSIBM.MQPOLICY;


A questo punto possiamo tentare la lettura di un messaggio da una coda:

SELECT MQREAD(‘MY_QUEUE_SRV’) from SYSIBM.SYSDUMMY1;
Se non sono presenti errori di configurazione verrà visualizzato il primo messaggio (FIFO) presente nella coda puntata dal servizio MY_QUEUE_SRV; in caso di assenza di messaggi verrà restituito NULL.

Se si desidera visualizzare l’intero resultset è sufficiente utilizzare la omologa Table function:
SELECT T1.* FROM TABLE(MQREADALL(‘MY_QUEUE_SRV’)) as T1;

Ulteriori parametri permettono di applicare policy particolari, in particolare il secondo parametro della funzione permette l’applicazione delle Policy:
SELECT MQREAD(‘MY_QUEUE_SRV’, ‘MY_POLICY’) from SYSIBM.SYSDUMMY1;
SELECT T1.* FROM TABLE(MQREADALL(‘MY_QUEUE_SRV’, ‘MY_POLICY’)) as T1;

Il terzo parametro, opzionale, permette di specificare un particolare Correlation_ID:
SELECT MQREAD(‘MY_QUEUE_SRV’, ‘MY_POLICY’, ‘AAA’) from SYSIBM.SYSDUMMY1;

Differentemente dalla funzione scalare la TableFunction può restituire più di un dominio di valori, in particolare, oltre a restituire il messaggio restituisce altri metadati:
CORRLEID              Id Correlazione
TOPIC     Topic
QNAME Nome dalla Coda
MSGID   Id del messaggio
MSGFORMAT       Tipo di messaggio

Si può quindi scrivere statement del tipo
SELECT T1.* FROM TABLE(MQREADALL(‘MY_QUEUE_SRV’, ‘MY_POLICY’)) as T1 where T1.MSGFORMAT=’MQHMDE’;

E’ inoltre possibile specificare il numero di righe da restituire nel result-set come terzo parametro:
SELECT T1.* FROM TABLE(MQREADALL(‘MY_QUEUE_SRV’, ‘MY_POLICY’, 10)) as T1 where T1.MSGFORMAT=’MQHMDE’;



Scrittura su Coda
La funzione MQSEND, che ha come possibili parametri
Servizio
Policy
Messaggio da inviare in coda

Può essere invocata tramite lo statement VALUES:

VALUES MQSEND(‘MY_QUEUE_SRV’, ‘mio messaggio da inserire in coda’);

e, opzionalmente specificando una policy:
VALUES MQSEND((‘MY_QUEUE_SRV’, ‘MY_POLICY’, ‘mio messaggio da inserire in coda’);

o direttamente da una SELECT:

SELECT MQSEND(‘MY_QUEUE_SRV’, COLUMN1) FROM MY_SCHEMA, MY_TABLE where …
In questo caso viene immesso un messaggio per ogni riga presente nella tabella MY_TABLE, col contenuto della colonna COLUMN1.

Il valore di ritorno della funzione varrà
1 se la funzione è stata eseguita con esito positivo, 0 negli altri casi.

Committment Control
E’ utile ricordare che MQ partecipa, se richiesto, al sistema transazionale di IBMi, in particolare è influenzata e assume idonei comportamenti in caso di job sotto transazione (commitment control), gestendo correttamente i comandi di i5OS <COMMIT> e <ROLLBACK>. Nel caso di consumo di messaggi con la MQRECEIVE un’operazione COMMIT conferma l’avvenuta ricezione, sia su MQ che eventualmente su DB, mentre una ROLLBACK provoca, come nelle transazioni DB, un ritorno a situazione iniziale: i messaggi rimangono disponibili sulla coda.


Un caso pratico
MQ viene spesso utilizzato per inviare informazioni da condividere su altri sistemi; la possibilità di utilizzare funzioni di manipolazione XML assieme alle funzioni MQ rende molto interessante l’approccio alla condivisione delle informazioni: XML permette infatti l’invio di metadati che descrivono il messaggio stesso.

select
mqsend(‘MY_QUEUE_SRV’,
                        xmlserialize(
                                    xmlelement(NAME "Document",
                                    xmlagg(XMLELEMENT( NAME "Row",
                                    xmlattributes (trim(t.MY_PK) as "pk" ,
'PROPAGATE'  as "op"),
xmlforest (     trim(t.DESCRIPTION) as "descr" ,
trim(t.STATUS) as "status"))
))  as varchar(32000))
            )
 from
MY_SCHEMA.MY_TABLE T
where MY_PK = 123


questo semplice statement provvede a creare una colonna di tipo XML, convertirla in varchar (xmlserialize) e inviarla alla coda determinata dal servizio MY_QUEUE_SRV.

<Document>
<Row pk="123" op="PROPAGATE">
<description>Middle East &amp; Africa</description>
<status>1</status>
</Row>
</Document>




Limiti, memorandum
Il DB2 e MQ devono risiedere sulla stessa istanza del sistema (non è quindi possibile effettuare connessioni a QManager non presenti sul sistema dove è installato il DB2)

Le dimensioni massime di un messaggio gestite da questa implementazione sono 32K per i Varchar e 2Mb per i CLOB; MQ non ha architetturalmente limiti di questo tipo: è tranquillamente in grado di gestire messaggi di oltre 100Mb per singolo messaggio ed è inoltre in grado di segmentare i messaggi stessi, rendendo virtualmente illimitata la dimensione del flusso da trasferire.

Per motivi prestazionali la sessione DB2 effettua il caching delle tabelle SYSIBM.MQSERVICE e SYSIBM.MQPOLICY: in una stesso job possono non essere visibili modifiche effettuate: occorre chiudere il lavoro e riconnettersi per recepire le modifiche sulle tabelle di configurazione

In caso di errori restituiti dalle API di MQ le funzioni DB2 restituiscono un SQLCode negativo: errori più dettagliati sono sempre reperibili nel joblog es
SQL State: 38401
 Vendor Code: -443
 Message: [SQL0443] cc=2:rc=2058:MQCONN FAILED
Il rc 2058 viene approfonditamente spiegato nel manuale “MQ Messages”.

2 commenti:

  1. Ciao

    interessante articolo, sai se il servizio è utilizzabile come altre code per publisher/subscriber?
    E'possibile usare la stessa coda per gestire topic diversi?

    Grazie

    RispondiElimina
    Risposte
    1. usai le P/S molti anni fa; non vedo comunque problemi in quanto la struttura P/S si appoggia sì ai broker per la propagazione ma, alla fine, tutto viene inoltrato tramite code, che non hanno differenti attributi.

      Elimina