A continuación veremos en un ejemplo como usar el producto WSO2 BAM.
El caso de uso que se presenta es el de analizar información de un sistema BPM.
El sistema BPM de ejemplo tienen tres tipos de procesos “COT” , “CR” y “POL” , cada uno de ellos puede tener muchas instancias de proceso ejecutándose.
El primer reporte a la izquierda corresponde a la cantidad de instancias por proceso.
El segundo reporte a la derecha es un drill-down de todas las instancias de proceso ejecutándose.
A continuación veremos paso a paso como podemos obtener estos reportes a partir de datos ficticios recogidos por un DataAgent.
1
Descargar la versión WSO2 BAM 2.4.0 de la pagina :
http://wso2.com/products/business-activity-monitor/
2
Unzip del paquete
Iniciar el servidor con wso2server.sh
Cuando se muestra el mensaje “WSO2 Carbon started in X secs” ya esta iniciado
3
Ejecutar el cliente java que registra los datos en Casandra usando el API Data Agent
DataPublisher dataPublisher = new DataPublisher("tcp://localhost:7611", "admin", "admin");
String streamId1 = dataPublisher.defineStream("{" +
" 'name':'es.demowso2bam.tablaprocesos1'," +
" 'version':'2.3.0'," +
" 'nickName': 'Tabla de procesos'," +
" 'description': 'Some Desc'," +
" 'tags':['foo', 'bar']," +
" 'metaData':[" +
" {'name':'ipAdd','type':'STRING'}" +
" ]," +
" 'payloadData':[" +
" {'name':'idproceso','type':'INT'}," +
" {'name':'estado','type':'STRING'}," +
" {'name':'fechainicio','type':'STRING'}," +
" {'name':'fechaactualizacion','type':'STRING'}," +
" {'name':'compania','type':'STRING'}," +
" {'name':'ramo','type':'STRING'}," +
" {'name':'poliza','type':'INT'}," +
" {'name':'agente','type':'STRING'}" +
" ]" +
"}");
log.info("1st stream defined: "+streamId1); //In this case correlation data is null
dataPublisher.publish(streamId1, new Object[]{"127.0.0.1"}, null, new Object[]{ 1 , "CR","01-02-2013","01-04-2013","G","US",1,"04183"});
4
Login en la pantalla admin https://<hostname>:9443/carbon
usuario : admin
passsword : admin
5
Acceder a la vista de Cassandra para ver las tablas creadas con el cliente Data Agent
6
Acceder a la vista Analítica para añadir queryes Hive que procesaran los datos
Ver el fichero de consultas en la última parte de este tutorial
7
Ejecutar los queryes Hive y Guardarlos
El detalle de los queryes están en el ultimo paso de este tutorial
8
Para ver los datos procesados por Hive en la base de datos H2 , debemos habilitar su puerto web , para esto descomentamos las líneas resaltadas y reiniciamos el servidor : WSO2_HOME/wso2bam-2.4.0/repository/conf$ carbon.xml
<H2DatabaseConfiguration>
<property name="web" />
<property name="webPort">8082</property>
<property name="webAllowOthers" />
<!--property name="webSSL" />
<property name="tcp" />
<property name="tcpPort">9092</property>
<property name="tcpAllowOthers" />
<property name="tcpSSL" />
<property name="pg" />
<property name="pgPort">5435</property>
<property name="pgAllowOthers" />
<property name="trace" />
<property name="baseDir">${carbon.home}</property-->
</H2DatabaseConfiguration>
url de datos relacional H2 : http://wso2-desktop:8082
jdbc:h2:repository/database/samples/BAM_STATS_DB;AUTO_SERVER=TRUE
Driver Class Name : org.h2.Driver
User Name : wso2carbon
Password : wso2carbon
9
Ahora generaremos un gadget para visualizar los datos procesados jdbc:h2:repository/database/samples/BAM_STATS_DB;AUTO_SERVER=TRUE
Driver Class Name : org.h2.Driver
User Name : wso2carbon
Password : wso2carbon
10
Seleccionar la query hacia tablas de H2
11
Seleccionar Preview
12
Ingresar Metadatos
13
Se genera fichero xml donde se guarda el gadget
14
Ir al repositorio de Gadgets Escoger : Add
15
Añadir los metadatos y el url generado en el paso anterior http://wso2-desktop:9763/registry/resource/_system/config/repository/components/org.wso2.carbon.bam.gadgetgen/gadgetgen/sumario_procesos.xml
16
Veremos que ya está guardado en el repositorio
17
Ahora Ir a la opción GadgetServer
Escoger : Add Gadgets
18
Escoger : Add
Y seleccionar nuestro gadget de la lista
19
Veremos como se despliega en la pestaña Home
20
Para obtener el reporte del detalle de procesos repitamos todos los pasos del 10 al 21 con el query:
Select IDPROCESO,ESTADO,FECHAINICIO,FECHAACTUALIZACION,COMPANIA,RAMO,POLIZA,AGENTE from DETALLE_PROCESOS
23
ANEXO : fichero de consultas de analítica .
Antes de ejecutarlo quitarle comentarios.
--QUERYS PARA TABLA DE PROCESOS
--ok hive apuntando a casandra CREATE EXTERNAL TABLE IF NOT EXISTS hive_tablaprocesos1 (
logId STRING,idproceso INT,estado STRING,
fechainicio STRING,fechaactualizacion STRING,
compania STRING,ramo STRING,poliza INT,agente STRING)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES (
"wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE" ,
"cassandra.cf.name" = "es_demowso2bam_tablaprocesos1" ,
"cassandra.columns.mapping" =
":key,payload_idproceso, payload_estado, payload_fechainicio,payload_fechaactualizacion,payload_compania,payload_ramo, payload_poliza,payload_agente" );
--hive apuntando a h2 -- procesada de recuenta de tareas agrupadas por estadoCREATE EXTERNAL TABLE IF NOT EXISTS sumario_procesos1(
proceso STRING, proceso_count INT)
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
TBLPROPERTIES (
'wso2.carbon.datasource.name'='WSO2BAM_DATASOURCE',
'hive.jdbc.update.on.duplicate' = 'true' ,
'hive.jdbc.primary.key.fields' = 'proceso' ,
'hive.jdbc.table.create.query' =
'CREATE TABLE sumario_procesos1 (proceso VARCHAR(100) NOT NULL PRIMARY KEY,
proceso_count INT)' );
--hive apuntando a h2 -- procesada de detalle de tareas ordenadas por estado
CREATE EXTERNAL TABLE IF NOT EXISTS detalle_procesos1 (
logId STRING,idproceso INT,estado STRING,
fechainicio STRING,fechaactualizacion STRING,
compania STRING,ramo STRING,poliza INT,agente STRING)
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
TBLPROPERTIES (
'wso2.carbon.datasource.name'='WSO2BAM_DATASOURCE',
'hive.jdbc.update.on.duplicate' = 'true' ,
'hive.jdbc.primary.key.fields' = 'logId' ,
'hive.jdbc.table.create.query' =
'CREATE TABLE detalle_procesos1 (logId VARCHAR(100),idproceso INT ,
estado VARCHAR(100),fechainicio VARCHAR(100),fechaactualizacion VARCHAR(100),compania VARCHAR(100),
ramo VARCHAR(100),poliza INT,agente VARCHAR(100))' );
--analitica -- procesada de recuenta de tareas agrupadas por estado insert overwrite table sumario_procesos1
select estado as proceso,count(idproceso) as proceso_count
from hive_tablaprocesos1
group by estado;
-- analitica ---- procesada de detalle de tareas ordenadas por estado
insert overwrite table detalle_procesos1
select logId,idproceso,estado,fechainicio,fechaactualizacion,compania,ramo,poliza,agente
from hive_tablaprocesos1
order by estado;
FIN