作为MQ的Redis
原创大约 6 分钟
事实上,Redis是可以作为消息队列来用的。
先创建属性配置类Configuration
。
package com.jedis.mq;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* 属性配置类
*
*/
public class Configuration extends Properties {
private static final long serialVersionUID = 50440463580273222L;
private static Configuration instance = null;
public static synchronized Configuration getInstance() {
if (instance == null) {
instance = new Configuration();
}
return instance;
}
public String getProperty(String key, String defaultValue) {
String val = getProperty(key);
return (val == null || val.isEmpty()) ? defaultValue : val;
}
public String getString(String name, String defaultValue) {
return this.getProperty(name, defaultValue);
}
public int getInt(String name, int defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}
public long getLong(String name, long defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}
public float getFloat(String name, float defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
}
public double getDouble(String name, double defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
}
public byte getByte(String name, byte defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
}
public Configuration() {
InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
try {
this.loadFromXML(in);
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
再定义消息实体类Message
。
package com.jedis.mq;
import java.io.Serializable;
/**
* 消息实体类
*
*/
public class Message implements Serializable {
private static final long serialVersionUID = 7792729L;
private int id;
private String content;
public Message(int id, String content) {
this.id = id;
this.content = content;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
创建对象转换的工具类ObjectUtil
。
package com.jedis.mq;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* 对象和byte[]互转
*
*/
public class ObjectUtil {
/**
* 对象转byte[]
*
* @param obj
* @return
* @throws IOException
*/
public static byte[] objectToBytes(Object obj) throws Exception {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
byte[] bytes = bo.toByteArray();
bo.close();
oo.close();
return bytes;
}
/**
* byte[]转对象
*
* @param bytes
* @return
* @throws Exception
*/
public static Object bytesToObject(byte[] bytes) throws Exception {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream sIn = new ObjectInputStream(in);
return sIn.readObject();
}
}
最后是用于消息收发的JedisMQ
。
package com.jedis.mq;
import java.util.List;
import java.util.Map;
import java.util.Set;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* 收发消息
*
*/
@SuppressWarnings({"deprecation", "unused"})
public class JedisMQ {
private static String JEDIS_IP;
private static int JEDIS_PORT;
private static String JEDIS_PASSWORD;
// private static String JEDIS_SLAVE;
private static JedisPool jedisPool;
static {
Configuration conf = Configuration.getInstance();
JEDIS_IP = conf.getString("jedis.ip", "127.0.0.1");
JEDIS_PORT = conf.getInt("jedis.port", 6379);
JEDIS_PASSWORD = conf.getString("jedis.password", null);
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(256);// 20
config.setMaxWaitMillis(5000L);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setMinEvictableIdleTimeMillis(60000l);
config.setTimeBetweenEvictionRunsMillis(3000l);
config.setNumTestsPerEvictionRun(-1);
jedisPool = new JedisPool(config, JEDIS_IP, JEDIS_PORT, 60000);
}
/**
* 获取数据
*
* @param key
* @return
*/
public static String get(String key) {
String value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.get(key);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return value;
}
public static void close(Jedis jedis) {
try {
jedisPool.returnResource(jedis);
} catch (Exception e) {
if (jedis.isConnected()) {
jedis.quit();
jedis.disconnect();
}
}
}
/**
* 获取数据
*
* @param key
* @return
*/
public static byte[] get(byte[] key) {
byte[] value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.get(key);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return value;
}
public static void set(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, value);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
}
public static void set(byte[] key, byte[] value, int time) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, value);
jedis.expire(key, time);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
}
public static void hset(byte[] key, byte[] field, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
}
public static void hset(String key, String field, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
}
/**
* 获取数据
*
* @param key
* @return
*/
public static String hget(String key, String field) {
String value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return value;
}
/**
* 获取数据
*
* @param key
* @return
*/
public static byte[] hget(byte[] key, byte[] field) {
byte[] value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return value;
}
public static void hdel(byte[] key, byte[] field) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hdel(key, field);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
}
/**
* 存储REDIS队列 顺序存储
*
* @param byte[]
* key reids键名
* @param byte[]
* value 键值
*/
public static void lpush(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.lpush(key, value);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
}
/**
* 存储REDIS队列 反向存储
*
* @param byte[]
* key reids键名
* @param byte[]
* value 键值
*/
public static void rpush(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.rpush(key, value);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
}
/**
* 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
*
* @param byte[]
* key reids键名
* @param byte[]
* value 键值
*/
public static void rpoplpush(byte[] key, byte[] destination) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.rpoplpush(key, destination);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
}
/**
* 获取队列数据
*
* @param byte[]
* key 键名
* @return
*/
public static List<byte[]> lpopList(byte[] key) {
List<byte[]> list = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
list = jedis.lrange(key, 0, -1);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return list;
}
/**
* 获取队列数据
*
* @param byte[] key 键名
* @return
*/
public static byte[] rpop(byte[] key) {
byte[] bytes = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
bytes = jedis.rpop(key);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return bytes;
}
public static void hmset(Object key, Map<String, String> hash, int time) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hmset(key.toString(), hash);
jedis.expire(key.toString(), time);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static List<String> hmget(Object key, String... fields) {
List<String> result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hmget(key.toString(), fields);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return result;
}
public static Set<String> hkeys(String key) {
Set<String> result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hkeys(key);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return result;
}
public static List<byte[]> lrange(byte[] key, int from, int to) {
List<byte[]> result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.lrange(key, from, to);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return result;
}
public static Map<byte[], byte[]> hgetAll(byte[] key) {
Map<byte[], byte[]> result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hgetAll(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return result;
}
public static void del(byte[] key) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.del(key);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
}
public static long llen(byte[] key) {
long len = 0;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.llen(key);
} catch (Exception e) {
// 释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
// 返还到连接池
close(jedis);
}
return len;
}
}
然后对它进行测试。
package com.jedis.mq;
/**
* 测试JedisMQ的消息收发
*
*/
public class TestRedisQuene {
public static byte[] redisKey = "key".getBytes();
static {
try {
init();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
pop();
}
private static void pop() {
byte[] bytes = JedisMQ.rpop(redisKey);
Message msg;
try {
msg = (Message) ObjectUtil.bytesToObject(bytes);
if (msg != null) {
System.out.println(msg.getId() + " " + msg.getContent());
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void init() throws Exception {
Message msg1 = new Message(1, "内容1");
JedisMQ.lpush(redisKey, ObjectUtil.objectToBytes(msg1));
Message msg2 = new Message(2, "内容2");
JedisMQ.lpush(redisKey, ObjectUtil.objectToBytes(msg2));
Message msg3 = new Message(3, "内容3");
JedisMQ.lpush(redisKey, ObjectUtil.objectToBytes(msg3));
}
}
感谢支持
更多内容,请移步《超级个体》。