編輯:關於Android編程
在addNow方法執行結束之後,我們已經為當前NioProcessor裡面所有的NioSocketSession對應的SocketChannel注冊了OP_READ事件,接下來繼續查看Processor的run方法,源碼在上一篇中有,執行到第49行,判斷如果selected大於0執行第52行的process方法,selected的值其實就是Selector的select方法返回值,表示客戶端存在和服務端交互的請求,那麼我們看看process做了些什麼事:
AbstractPollingIoProcessor$process()
private void process() throws Exception {
for (Iterator i = selectedSessions(); i.hasNext();) {
S session = i.next();
process(session);
i.remove();
}
}
可以發現他就是遍歷那些已經發生注冊事件的NioSocketSession集合,並且調用process(S session)方法:
AbstractPollingIoProcessor$process()
private void process(S session) {
// Process Reads
if (isReadable(session) && !session.isReadSuspended()) {
read(session);
}
// Process writes
if (isWritable(session) && !session.isWriteSuspended()) {
// add the session to the queue, if it's not already there
if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
}
}
}
首先通過isReadable方法判斷當前NioSocketSession對應的SocketChannel中是否注冊過OP_READ事件,如果注冊過的話,執行read(session)方法;
AbstractPollingIoProcessor$read()
private void read(S session) {
IoSessionConfig config = session.getConfig();
int bufferSize = config.getReadBufferSize();
IoBuffer buf = IoBuffer.allocate(bufferSize);
final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
try {
int readBytes = 0;
int ret;
try {
if (hasFragmentation) {
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
if (!buf.hasRemaining()) {
break;
}
}
} else {
ret = read(session, buf);
if (ret > 0) {
readBytes = ret;
}
}
} finally {
buf.flip();
}
if (readBytes > 0) {
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);
buf = null;
if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
if (ret < 0) {
// scheduleRemove(session);
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireInputClosed();
}
} catch (Exception e) {
if (e instanceof IOException) {
if (!(e instanceof PortUnreachableException)
|| !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
|| ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
scheduleRemove(session);
}
}
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
}
}
這部分代碼比較長,我們了解主干就可以了;
首先第4行創建了一個IoBuffer對象,其實這就是我們java NIO中的Buffer角色,接著看到調用了read(session,buf)方法,這個方法返回值大於0表示讀取數據成功,具體這個方法裡面執行了些什麼我們可以到NioProcessor裡面的read方法看看:
protected int read(NioSession session, IoBuffer buf) throws Exception {
ByteChannel channel = session.getChannel();
return channel.read(buf.buf());
}
其實很簡單了,就是將通道中的數據寫到我們的緩存中罷了,這就是NIO本身的用法;
如果我們讀取到了數據,就會執行第33行的if語句,在if語句塊中會執行IoFilterChain的fireMessageReceived方法,其實呢,IoFilterChain就是我們的責任鏈,前面分析源碼的過程中我們知道在創建NioSocketSession的時候會創建一個DefaultIoFilterChain出來,並且會在它裡面創建一個EntryImpl鏈,默認情況下會創建一個HeadFilter鏈頭和TailFilter鏈尾,那麼這裡的IoFilterChain其實就是對DefaultIoFilterChain進行轉換過來的,默認情況下也就值存在鏈頭和鏈尾了,我們在使用MINA的時候可以通過NioSocketAcceptor的getFilterChain獲得其對應的IoFilterChain,其實getFilterChain的真正實現是在AbstarctIoService裡面的,有了這個IoFilterChain之後,我們可以調用他的addLast方法為其添加我們自定義或者MINA自帶的Filter對象,addLast的真正實現是在DefaultIoFilterChain裡面的,我們可以看看:
public synchronized void addLast(String name, IoFilter filter) {
checkAddable(name);
register(tail.prevEntry, name, filter);
}
間接調用了register方法,來看看register
private void register(EntryImpl prevEntry, String name, IoFilter filter) {
EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry, name, filter);
try {
filter.onPreAdd(this, name, newEntry.getNextFilter());
} catch (Exception e) {
throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':' + filter + " in " + getSession(), e);
}
prevEntry.nextEntry.prevEntry = newEntry;
prevEntry.nextEntry = newEntry;
name2entry.put(name, newEntry);
try {
filter.onPostAdd(this, name, newEntry.getNextFilter());
} catch (Exception e) {
deregister0(newEntry);
throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':' + filter + " in " + getSession(), e);
}
}
如果你鏈表操作很熟的話,會發現其實這裡進行的就是鏈表插入操作了,在第10行和11行可以體現出來,那麼我們這裡有個疑問了,鏈表操作的時候,我們只需要一個鏈頭就可以了,沒必要給鏈尾啊,這裡的鏈尾是干嘛的呀,我來告訴你答案吧,鏈尾其實就是用來鏈接我們的IoHandler對象的,IoHandler是我們整個責任鏈的結束部分,我們真正的業務邏輯的處理都是在它裡面完成的,所以你會發現在你使用MINA框架的時候,如果不給NioSocketAcceptor設置IoHandler的話是會報異常的,因為他是要進行業務邏輯處理的,沒有他你整個程序是沒法處理的,既然他是鏈接在鏈尾後面的,那麼我們就該看看TailFilter的實現了:
他是DefaultIoFilterChain的靜態內部類,代碼比較長,我就截取兩個方法,其他的方法類似啦:
private static class TailFilter extends IoFilterAdapter {
@Override
public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
try {
session.getHandler().sessionCreated(session);
} finally {
// Notify the related future.
ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);
if (future != null) {
future.setSession(session);
}
}
}
@Override
public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
session.getHandler().sessionOpened(session);
}
}
可以看到在TailFiler裡面執行的方法實際上都是執行的IoHandler中對應的方法啦,也就是這樣我們把Filter責任鏈和IoHandler聯系到了一起;
好了,扯得有點遠了,繼續回到我們的AbstractPollingIoProcessor裡面的read方法,第33行在我們獲取到數據之後首先會獲得我們的DefaultIoFilterChain責任鏈,並且調用fireMessageReceived方法,我們來看看fireMessageReceived方法:
這個方法位於DefaultIoFilterChain中
public void fireMessageReceived(Object message) {
if (message instanceof IoBuffer) {
session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis());
}
callNextMessageReceived(head, session, message);
}
可以看到他調用的是callNextMessageReceived方法
private void callNextMessageReceived(Entry entry, IoSession session, Object message) {
try {
IoFilter filter = entry.getFilter();
NextFilter nextFilter = entry.getNextFilter();
filter.messageReceived(nextFilter, session, message);
} catch (Exception e) {
fireExceptionCaught(e);
} catch (Error e) {
fireExceptionCaught(e);
throw e;
}
}
在callNextMessageReceived方法中首先會獲得當前Filter對象,接著獲得當前Filter的nextFilter對象,接著調用filter的messageReceived方法,這個方法其實上執行的是DefaultIoFilterChain的messageReceived方法:
public void messageReceived(IoSession session, Object message) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextMessageReceived(nextEntry, session, message);
}
可以看到他還是執行的callNextMessageReceived方法,這樣層層遞歸的執行,直到Filter的鏈尾,那麼接下來就是執行IoHandler裡面對應的messageReceived方法進行具體的業務邏輯操作喽!這樣的話,整個read過程中涉及到的關鍵部分就結束啦!
接下來分析下write過程,如果我們想要給服務端發送消息內容的話,首先我們需要獲取到IoSession對象,這裡我們以NioSocketSession為例,發送消息調用的將是他的write方法,查看NioSocketSession發現他裡面沒有write方法,到他的父類NioSession查看也不存在,最後在AbstractIoSession找到啦;
AbstractIoSession$write()
public WriteFuture write(Object message) {
return write(message, null);
}
也就是說他執行的是兩個參數的write方法,
public WriteFuture write(Object message, SocketAddress remoteAddress) {
if (message == null) {
throw new IllegalArgumentException("Trying to write a null message : not allowed");
}
// We can't send a message to a connected session if we don't have
// the remote address
if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
throw new UnsupportedOperationException();
}
// If the session has been closed or is closing, we can't either
// send a message to the remote side. We generate a future
// containing an exception.
if (isClosing() || !isConnected()) {
WriteFuture future = new DefaultWriteFuture(this);
WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
WriteException writeException = new WriteToClosedSessionException(request);
future.setException(writeException);
return future;
}
FileChannel openedFileChannel = null;
// TODO: remove this code as soon as we use InputStream
// instead of Object for the message.
try {
if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
// Nothing to write : probably an error in the user code
throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
} else if (message instanceof FileChannel) {
FileChannel fileChannel = (FileChannel) message;
message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
} else if (message instanceof File) {
File file = (File) message;
openedFileChannel = new FileInputStream(file).getChannel();
message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
}
} catch (IOException e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
return DefaultWriteFuture.newNotWrittenFuture(this, e);
}
// Now, we can write the message. First, create a future
WriteFuture writeFuture = new DefaultWriteFuture(this);
WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
// Then, get the chain and inject the WriteRequest into it
IoFilterChain filterChain = getFilterChain();
filterChain.fireFilterWrite(writeRequest);
// TODO : This is not our business ! The caller has created a
// FileChannel,
// he has to close it !
if (openedFileChannel != null) {
// If we opened a FileChannel, it needs to be closed when the write
// has completed
final FileChannel finalChannel = openedFileChannel;
writeFuture.addListener(new IoFutureListener() {
public void operationComplete(WriteFuture future) {
try {
finalChannel.close();
} catch (IOException e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
});
}
// Return the WriteFuture.
return writeFuture;
}
這部分源碼比較長,我們挑重點看,在第45行創建了一個WriteFuture對象,接著把write的消息內容以及WriteFuture對象對象作為參數封裝出來一個WriteRequest對象,第49行獲得了我們的責任鏈,和read過程一樣,我們也可以通過DefaultIoFilterChain的addLast方法添加自己創建的Filter對象,接著第50行調用DefaultIoFilterChain的fireFilterWrite方法
public void fireFilterWrite(WriteRequest writeRequest) {
callPreviousFilterWrite(tail, session, writeRequest);
}
可以看到這個方法執行的是callPreviousFilterWrite方法
private void callPreviousFilterWrite(Entry entry, IoSession session, WriteRequest writeRequest) {
try {
IoFilter filter = entry.getFilter();
NextFilter nextFilter = entry.getNextFilter();
filter.filterWrite(nextFilter, session, writeRequest);
} catch (Exception e) {
writeRequest.getFuture().setException(e);
fireExceptionCaught(e);
} catch (Error e) {
writeRequest.getFuture().setException(e);
fireExceptionCaught(e);
throw e;
}
}
和之前的read方法中責任鏈的執行過程一樣,也是首先獲取filter對象,同時獲取該filter對象的下一個nextFilter對象,調用他的filterWrite方法
@Override
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
nextFilter.filterWrite(session, writeRequest);
}
它裡面調用的是filterWrite方法,這個方法裡面會繼續調用filterWrite方法,這樣層層遞歸,直到到達責任鏈的鏈頭,也就是HeadFilter為止,調用HeadFilter的filterWrite方法,HeadFilter是DefaultIoFilterChain的靜態內部類:
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
// Maintain counters.
if (writeRequest.getMessage() instanceof IoBuffer) {
IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
// I/O processor implementation will call buffer.reset()
// it after the write operation is finished, because
// the buffer will be specified with messageSent event.
buffer.mark();
int remaining = buffer.remaining();
if (remaining > 0) {
s.increaseScheduledWriteBytes(remaining);
}
} else {
s.increaseScheduledWriteMessages();
}
WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();
if (!s.isWriteSuspended()) {
if (writeRequestQueue.isEmpty(session)) {
// We can write directly the message
s.getProcessor().write(s, writeRequest);
} else {
s.getWriteRequestQueue().offer(s, writeRequest);
s.getProcessor().flush(s);
}
} else {
s.getWriteRequestQueue().offer(s, writeRequest);
}
}
在HeadFilter的filterWrite方法裡面,你會看到有這麼一句代碼s.getProcessor(),他其實上就是獲得處理我們當前NioSocketSession的NioProcessor對象而已,那麼接下來的write操作是包含兩個參數的,我們的NioProcessor裡面並沒有實現這個方法,需要到他的父類AbstractPollingIoProcessor查看,代碼如下:
public void write(S session, WriteRequest writeRequest) {
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
writeRequestQueue.offer(session, writeRequest);
if (!session.isWriteSuspended()) {
this.flush(session);
}
}
做的事還是比較少的,就是將當前的寫請求加入到我們當前NioSocketSession的寫請求隊列中,同時通過AbstractPollingIoProcessor的flush方法將NioSocketSession放入到flushingSessions隊列中,這個隊列主要存儲的是那些將要被flush的IoSession集合;我們來看看flush方法
AbstractPollingIoProcessor$flush
public final void flush(S session) {
// add the session to the queue if it's not already
// in the queue, then wake up the select()
if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
wakeup();
}
}
第5行執行了將當前NioSocketSession加入到flushingSession的操作,隨後調用了wakeup方法,wakeup方法會喚醒我們阻塞的select方法,這樣的話,我們的服務端就可以收到客戶端發送的消息了,接著讀取過程就和上面的源碼講解一樣了;
至此,MINA中主要的服務端源碼分析結束了,注意我們只分析了NioSocketAcceptor部分的源碼,沒有涉及NioSocketConnector部分,其實NioSocketConnector部分的源碼和NioSocketAcceptor部分基本上是類似的分析過程,在這裡我就不細細分析了,下一篇我會對MINA框架做一個小結,包括它裡面涉及到的一些線程模型結構;
Android開發之Android studio使用git gitlab(二)
1)首先先將gitlab上的開發項目clone到本地(可以使用命令行或者管理工具,具體操作在GitLab中已經涉及,這裡不再贅述),然後導入到AndroidStudio中
微信理財通安全嗎? 微信理財通的錢易被盜嗎?
現在微信理財通相對於余額寶收益高一些,即使理財通的收益較高,但很多網友都在擔心微信理財通是否安全的問題,是否能保障用戶的錢不怕被盜。我們一起來探究下吧!微信
Android 實用工具之emulator介紹
在android-sdk\tools目錄下,有一個名為emulator.exe的可執行程序,望名知義,emulator即為仿真器或模擬器,但很多人可能會發現,通過AVD
Android中MVP的初步認識與簡單用法
概述認識MVP模式MVP 模式實際上指的是 Model-View-Presenter 主要的目的是為了劃分各個模塊的負責區域,分工明確,使代碼清晰了很多。也是為了減少 A