中有两类线程,我们需要保证任务线程或者调度

java使用默认线程池踩过的坑(1)

场景

一个调度器,两个调度任务,分别处理两个目录下的txt文件,某个调度任务应对某些复杂问题的时候会持续特别长的时间,甚至有一直阻塞的可能。我们需要一个manager来管理这些task,当这个task的上一次执行时间距离现在超过5个调度周期的时候,就直接停掉这个线程,然后再重启它,保证两个目标目录下没有待处理的txt文件堆积。

问题

直接使用java默认的线程池调度task1和task2.由于外部txt的种种不可控原因,导致task2线程阻塞。现象就是task1和线程池调度器都正常运行着,但是task2迟迟没有动作。

当然,找到具体的阻塞原因并进行针对性解决是很重要的。但是,这种措施很可能并不能完全、彻底、全面的处理好所有未知情况。我们需要保证任务线程或者调度器的健壮性!

方案计划

线程池调度器并没有原生的针对被调度线程的业务运行状态进行监控处理的API。因为task2是阻塞在我们的业务逻辑里的,所以最好的方式是写一个TaskManager,所有的任务线程在执行任务前全部到这个TaskManager这里来注册自己。这个TaskManager就负责对于每个自己管辖范围内的task进行实时全程监控!

后面的重点就是如何处理超过5个执行周期的task了。

方案如下:

●一旦发现这个task线程,立即中止它,然后再次重启;

●一旦发现这个task线程,直接将整个pool清空并停止,重新放入这两个task ——task明确的情况下】;

方案实施

中止后重启

●Task实现类

class FileTask extends Thread { private long lastExecTime = 0; protected long interval = 10000; public long getLastExecTime() {     return lastExecTime; } public void setLastExecTime(long lastExecTime) {     this.lastExecTime = lastExecTime; } public long getInterval() {     return interval; } public void setInterval(long interval) {     this.interval = interval; }  public File[] getFiles() {     return null; } 

●Override

public void run() { while (!Thread.currentThread().isInterrupted()) { lastExecTime = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + " is running -> " + new Date()); try { Thread.sleep(getInterval() * 6 * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace();    // 当线程池shutdown之后,这里就会抛出exception了             }         }     }     } 

●TaskManager

public class TaskManager  implements Runnable { private final static Log logger = LogFactory.getLog(TaskManager .class); public Set<FileTask> runners = new CopyOnWriteArraySet<FileTask>(); ExecutorService pool = Executors.newCachedThreadPool(); public void registerCodeRunnable(FileTask process) { runners.add(process); } public TaskManager (Set<FileTask> runners) { this.runners = runners; } 

@Override

public void run() {        while (!Thread.currentThread().isInterrupted()) {            try {                long current = System.currentTimeMillis();                for (FileTask wrapper : runners) {                    if (current - wrapper.getLastExecTime() > wrapper.getInterval() * 5) {                        wrapper.interrupt();                        for (File file : wrapper.getFiles()) {                            file.delete();                        }                     wrapper.start();                      }                }            } catch (Exception e1) {                logger.error("Error happens when we trying to interrupt and restart a task ");                ExceptionCollector.registerException(e1);            }            try {                Thread.sleep(500);            } catch (InterruptedException e) {            }        }    }     

这段代码会报错 java.lang.Thread IllegalThreadStateException。为什么呢?其实这是一个很基础的问题,您应该不会像我一样马虎。查看Thread.start()的注释, 有这样一段:

It is never legal to start a thread more than once. In particular, a thread may not be restarted once it has completed execution.

是的,一个线程不能够启动两次。那么它是怎么判断的呢?

public synchronized void start() {         /**          * A zero status value corresponds to state "NEW".    0对应的是state NEW          */ 

if (threadStatus != 0) //如果不是NEW state,就直接抛出异常!


图片 1


) 场景 一个调度器,两个调度任务,分别处理两个目录下的txt文件,某个调度任务应对某些复杂问题的时候会...

java中通用的线程池实例代码,java线程池实例

复制代码 代码如下:
package com.smart.frame.task.autoTask;

import java.util.Collection;
import java.util.Vector;

/**
 * 任务分发器
 */
public class TaskManage extends Thread
{
    protected Vector<Runnable> tasks = new Vector<Runnable>();
    protected boolean running = false;
    protected boolean stopped = false;
    protected boolean paused = false;
    protected boolean killed = false;
    private ThreadPool pool;

    public TaskManage(ThreadPool pool)
    {
        this.pool = pool;
    }

    public void putTask(Runnable task)
    {
        tasks.add(task);
    }

    public void putTasks(Runnable[] tasks)
    {
        for (int i = 0; i < tasks.length; i++)
            this.tasks.add(tasks[i]);
    }

    public void putTasks(Collection<Runnable> tasks)
    {
        this.tasks.addAll(tasks);
    }

    protected Runnable popTask()
    {
        if (tasks.size() > 0) return (Runnable) tasks.remove(0);
        else return null;
    }

    public boolean isRunning()
    {
        return running;
    }

    public void stopTasks()
    {
        stopped = true;
    }

    public void stopTasksSync()
    {
        stopTasks();
        while (isRunning())
        {
            try
            {
                sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
            }
        }
    }

    public void pauseTasks()
    {
        paused = true;
    }

    public void pauseTasksSync()
    {
        pauseTasks();
        while (isRunning())
        {
            try
            {
                sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
            }
        }
    }

    public void kill()
    {
        if (!running) interrupt();
        else killed = true;
    }

    public void killSync()
    {
        kill();
        while (isAlive())
        {
            try
            {
                sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
            }
        }
    }

    public synchronized void startTasks()
    {
        running = true;
        this.notify();
    }

    public synchronized void run()
    {
        try
        {
            while (true)
            {
                if (!running || tasks.size() == 0)
                {
                    pool.notifyForIdleThread();
                    this.wait();
                }
                else
                {
                    Runnable task;
                    while ((task = popTask()) != null)
                    {
                        task.run();
                        if (stopped)
                        {
                            stopped = false;
                            if (tasks.size() > 0)
                            {
                                tasks.clear();
                                System.out.println(Thread.currentThread().getId() + ": Tasks are stopped");
                                break;
                            }
                        }
                        if (paused)
                        {
                            paused = false;
                            if (tasks.size() > 0)
                            {
                                System.out.println(Thread.currentThread().getId() + ": Tasks are paused");
                                break;
                            }
                        }
                    }
                    running = false;
                }

                if (killed)
                {
                    killed = false;
                    break;
                }
            }
        }
        catch (InterruptedException e)
        {
            TaskException.getResultMessage(e);
            return;
        }
    }
}

复制代码 代码如下:
package com.smart.frame.task.autoTask;

import java.util.Collection;
import java.util.Iterator;
import java.util.Vector;

/**
 * 线程池
 */
public class ThreadPool
{
    protected int maxPoolSize = TaskConfig.maxPoolSize;
    protected int initPoolSize = TaskConfig.initPoolSize;
    protected Vector<TaskManage> threads = new Vector<TaskManage>();
    protected boolean initialized = false;
    protected boolean hasIdleThread = false;

    public ThreadPool()
    {
        super();
    }

    public ThreadPool(int maxPoolSize, int initPoolSize)
    {
        this.maxPoolSize = maxPoolSize;
        this.initPoolSize = initPoolSize;
    }

    public void init()
    {
        initialized = true;
        for (int i = 0; i < initPoolSize; i++)
        {
            TaskManage thread = new TaskManage(this);
            thread.start();
            threads.add(thread);
        }
    }

    public void setMaxPoolSize(int maxPoolSize)
    {
        this.maxPoolSize = maxPoolSize;
        if (maxPoolSize < getPoolSize()) setPoolSize(maxPoolSize);
    }

    /**
     * 重设当前线程数 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事
     * 务处理完成 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
     */
    public void setPoolSize(int size)
    {
        if (!initialized)
        {
            initPoolSize = size;
            return;
        }
        else if (size > getPoolSize())
        {
            for (int i = getPoolSize(); i < size && i < maxPoolSize; i++)
            {
                TaskManage thread = new TaskManage(this);
                thread.start();
                threads.add(thread);
            }
        }
        else if (size < getPoolSize())
        {
            while (getPoolSize() > size)
            {
                TaskManage th = (TaskManage) threads.remove(0);
                th.kill();
            }
        }
    }

    public int getPoolSize()
    {
        return threads.size();
    }

    protected void notifyForIdleThread()
    {
        hasIdleThread = true;
    }

    protected boolean waitForIdleThread()
    {
        hasIdleThread = false;
        while (!hasIdleThread && getPoolSize() >= maxPoolSize)
        {
            try
            {
                Thread.sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
                return false;
            }
        }

        return true;
    }

    public synchronized TaskManage getIdleThread()
    {
        while (true)
        {
            for (Iterator<TaskManage> itr = threads.iterator(); itr.hasNext();)
            {
                TaskManage th = (TaskManage) itr.next();
                if (!th.isRunning()) return th;
            }

            if (getPoolSize() < maxPoolSize)
            {
                TaskManage thread = new TaskManage(this);
                thread.start();
                threads.add(thread);
                return thread;
            }

            if (waitForIdleThread() == false) return null;
        }
    }

    public void processTask(Runnable task)
    {
        TaskManage th = getIdleThread();
        if (th != null)
        {
            th.putTask(task);
            th.startTasks();
        }
    }

    public void processTasksInSingleThread(Runnable[] tasks)
    {
        TaskManage th = getIdleThread();
        if (th != null)
        {
            th.putTasks(tasks);
            th.startTasks();
        }
    }

    public void processTasksInSingleThread(Collection<Runnable> tasks)
    {
        TaskManage th = getIdleThread();
        if (th != null)
        {
            th.putTasks(tasks);
            th.startTasks();
        }
    }

}

复制代码 代码如下:
package com.smart.frame.task.autoTask;

public class TopTask implements Runnable
{

    private ThreadPool pool;

    public TopTask()
    {
        super();
    }

    public TopTask(ThreadPool pool)
    {
        super();
        this.pool = pool;
    }

    @Override
    public void run()
    {
        init();
        start();
    }

    /**
     * 初始化验证权限、参数之类
     */
    public void init()
    {

    }

    /**
     * 开始自动任务
     */
    public void start()
    {
        for (int i = 0; i < 10; i++)
        {
            pool.processTask(new BeginAuto());
        }
    }
}
/**
 * 实现类
 */
class BeginAuto implements Runnable
{
    @Override
    public void run()
    {
        System.out.println(Thread.currentThread().getId() + "..................");
    }

}

复制代码 代码如下: package com.smart.frame.task.autoTask; import java.util.Collection; import java.util.Vector;...

 

线程优先级

线程优先级的系统规则
线程是具有优先级的,高优先级的线程有更多CPU资源。
继承-线程优先级具有继承性,如果ThreadA启动了ThreadB,B默认具有和A一样的优先级。(this.priority = parent.getPriority();)
设置-优先级可以手动设置。
并行-高优先级的线程能获得更多CPU资源,但是低优先级的线程也能继续工作,高优先级并不会先执行
随机-高优先级的线程并不一定先执行,执行顺序是随机的。
线程优先级的设置
Thread和HandlerThread设置优先级的方式不一样。
Thread中线程优先级范围是1~10,数值越高,优先级越高,默认是5。
java.lang.Thread.setPriority(int i);
HandlerThread中线程优先级范围是-20~19,数值越低,优先级越高,默认为0。
android.os.Process.setThreadPriority(int p);
android.os.Process.setThreadPriority(int tid, int p);
HandlerThread还可以通过new HandlerThread("tname",-3)来设置。
在Thread或Runnable的run方法中,也可以通过Process.setThreadPriority设置优先级。
一般在实际应用中,通过Process设置优先级,对线程调度影响效果更明显,因为Process是android系统特别优化过的,是native的方法。

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        //死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  1. class MyDaemon implements Runnable {  
  2.   public void run() {  
  3.   for (long i = 0; i < 9999999L; i++) {  
  4.   System.out.println("后台线程第" + i + "次执行!");  
  5.   try {  
  6.   Thread.sleep(7);  
  7.   } catch (InterruptedException e) {  
  8.   e.printStackTrace();  
  9.   }  
  10.   }  
  11.   }  
  12.   }  

线程池

线程池可以节约创建和销毁线程的资源开销。

  1. 线程池常见的几个类的用法:
    ThreadPoolExecutor、Executor,Executors,ExecutorService,CompletionService,Future,Callable 等
  2. 线程池四个分类
    newCachedThreadPool、newFixedThreadPool、newScheduledThreadPool和SingleThreadExecutor
  3. 自定义线程池 ThreadPoolExecutor
    线程池工作原理
    核心线程数、等待队列、处理策略等

写到这里差点忘记了我们当初的目的,我们的目的是查看线程池是如何保证核心线程不被销毁的。
看到这里终于出现了一点眉目,在任务添加成功后,我们发现了t.start()

 

线程操作

同步
同步方式:synchronized和lock
同步相关方法:wait()/notify()/notifyAll() sleep()/join()/yield() await()/signal()/signalAll

  private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //如果当前线程池的状态时SHUTDOWN,STOP,TIDYING,TERMINATED并且为SHUTDOWN状态时任务队列为空,那么就返回false  原因:如果调用了shutdown方法,此时的线程池还会继续工作并且会在任务队列中的所有任务执行完成后才会结束线程池。
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //死循环
            for (;;) {
                int wc = workerCountOf(c);
                //core是在execute方法中传的参数,true表示 核心线程,false表示最大线程 
                 //CAPACITY  可以理解为Integer的最大值  1左移29位再-1
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //如果增加任务数量成功那么退出这个循环执行下面的代码,否则继续    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //这行代码仔细记着  稍后分析
            w = new Worker(firstTask);
            final Thread t = w.thread;

            if (t != null) {
                //同步块 使用内置锁 锁住
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                      //再判断一次当前线程池的状态  避免在执行过程中线程时被使用者关闭
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //向正在执行的任务队列(workers)中添加work    区别一下:workqueue是等待执行的阻塞队列
                        workers.add(w);
                        int s = workers.size();
                        //记录曾经并发执行的最大任务个数
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                            //添加任务成功
                        workerAdded = true;
                    }
                } finally {
                    //finally块释放内置锁    
                    mainLock.unlock();
                }
                //如果任务添加成功那么开始执行任务
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

如果我们手工使用JDK Timer(Quartz的Scheduler),在Web容器启动时启动Timer,当Web容器关闭时,除非你手工关闭这个Timer,否则Timer中的任务还会继续运行!

引用

全面理解Java内存模型
Java多线程系列目录(共43篇)
Android开发——Android中常见的4种线程池(保证你能看懂并理解)
Handler、Thread和Runnable简单分析

我们通常都是通过执行execute(Runnable command)方法来向线程池提交一个不需要返回结果的任务的(如果你需要返回结果那么就是 <T> Future<T> submit(Callable<T> task)方法),怀着一颗探索的心,敲敲翻开了线程池的源码:

  实际上:JRE判断程序是否执行结束的标准是所有的前台执线程行完毕了,而不管后台线程的状态,因此,在使用后台县城时候一定要注意这个问题。 

sleep() wait() join() yield()

sleep阻塞暂停,不释放锁和cpu
wait释放锁和cpu,需要被notify才能唤醒
join是暂停当前线程,先执行join进来的线程
yield是把当前线程设为可执行状态,给同等或更高优先级的其他线程一个执行机会,线程yield之后,很有可能继续恢复执行

  Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** 重写了run方法  */
        public void run() {
            runWorker(this);
        }
  1. public class Test {  
  2.   public static void main(String args) {  
  3.   Thread t1 = new MyCommon();  
  4.   Thread t2 = new Thread(new MyDaemon());  
  5.   t2.setDaemon(true); //设置为守护线程  
  6.   t2.start();  
  7.   t1.start();  
  8.   }  
  9.   }  
  10.   class MyCommon extends Thread {  
  11.   public void run() {  
  12.   for (int i = 0; i < 5; i++) {  
  13.   System.out.println("线程1第" + i + "次执行!");  
  14.   try {  
  15.   Thread.sleep(7);  
  16.   } catch (InterruptedException e) {  
  17.   e.printStackTrace();  
  18.   }  
  19.   }  
  20.   }  
  21.   }  

线程的定义和状态

创建、就绪、运行、阻塞、停止

今天看到了别人的一个代码,为了实现每小时重启一下MQ拉取消息,他使用的是Thread.sleep(1000*60*60)方法,然后重启MQ。我一看到就非常头疼啊。。为什么要使用这种方式而不使用java的线程池呢?于是我就问他,他说当时为了方便。大家都知道Thread.sleep期间是不会释放共享资源的,会造成死锁现象。然后我就想Thread.sleep可以在睡觉过程中等待被interrupt中断,然后继续工作。那么线程池是怎么保证他的核心线程不释放 而一直等待任务的执行的呢?难道我们一直理解的线程run方法执行完毕线程就销毁是不正确的?而且还有我们为何通过设置allowCoreThreadTimeOut(true) 就能使核心线程销毁的呢?

后台线程第0次执行!
  线程1第0次执行! 
  线程1第1次执行! 
  后台线程第1次执行! 
  后台线程第2次执行! 
  线程1第2次执行! 
  线程1第3次执行! 
  后台线程第3次执行! 
  线程1第4次执行! 
  后台线程第4次执行! 
  后台线程第5次执行! 
  后台线程第6次执行! 
  后台线程第7次执行! 
  Process finished with exit code 0 
  从上面的执行结果可以看出: 
  前台线程是保证执行完毕的,后台线程还没有执行完毕就退出了。 

Handler、Thread和Runnable

Runnable是接口,可避免单继承局限,使用更灵活,且一个实例可以给多个thread共享。
Thread其实是一个对象,thread有thread.run()和thread.start()两种方法,run方法其实没有新建线程,而是在当前线程中直接执行;start才会真正创建一个线程,start带有synchronized同步锁,且一个nativeCreate的native方法去请求CPU,这个函数会把Thread自己的实例传进去,是c++实现的,sleep/interrupt等方法都是在这里实现的。
Handler是用来和Looper中的消息队列交互的,handler通过ThreadLocal获取Looper的MessageQueue,可以向queue中添加message,message的target又指向handler,这样Looper循环处理消息时,会把消息再交给handler去处理。

关键代码就是那个while循环。如果task不为空执行task否则从getTask()中取任务。在执行完任务后会在finally 块中设置task = null;
顺便介绍一下,在这里,我们可以看到beforeExecute(Thread t, Runnable r)方法和afterExecute(Runnable r, Throwable t)会在任务的执行前后执行,我们可以通过继承线程池的方式来重写这两个方法,这样就能够对任务的执行进行监控啦。
咋一看 好像没什么问题。其实我们可以发现如果执行完一个任务 task 设置为 null。就要调用 getTask()方法 。 点进去查看一下。

这里有几点需要注意: 

常用的线程类

Thread和HandlerThread
在使用场景上,HandlerThread更节省资源:
如果多次调用new Thread(){...},会创造多个匿名线程,销毁资源
使用HandlerThread,是通过Looper缓存任务,重用线程,节省资源开销
Thread、Runnable和Callable
Runnable是个接口,实现更加灵活,而且一个Runnable的实例可以被多个Thread复用

可不要被迷惑,这里的t是通过work.thread; 得到的。这时候我们需要查看work类中的run方法。
work在ThreadPoolExecutor为一个内部类实现了Runnable接口。只有一个构造方法

图片 2 

有人可能不理解ctl.get()简单解释一下,线程池是通过Integer类型的高3为表述当前线程池的状态RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED 。低29位表示当前线程的运行任务数量。然后通过位运算来计算运行状态和任务的数量。
解释一下execute的执行流程

本文由必威发布于必威-运维,转载请注明出处:中有两类线程,我们需要保证任务线程或者调度

TAG标签:
Ctrl+D 将本页面保存为书签,全面了解最新资讯,方便快捷。